Whamcloud - gitweb
b=21485 Keep in-memory client data until export destroy
authorMike Tappro <tappro@sun.com>
Wed, 3 Mar 2010 01:32:45 +0000 (17:32 -0800)
committerRobert Read <rread@sun.com>
Wed, 3 Mar 2010 01:32:45 +0000 (17:32 -0800)
Client data on disk is removed upon disconnect but in-memory data is needed
while export is still used and must be kept until its destroy.

i=adilger
i=rread

lustre/include/lu_target.h
lustre/include/lustre_disk.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/ptlrpc/target.c

index 811f47e..09c98a6 100644 (file)
@@ -56,11 +56,9 @@ struct lu_target {
         /** Lock protecting client bitmap */
         cfs_spinlock_t           lut_client_bitmap_lock;
         /** Bitmap of known clients */
         /** Lock protecting client bitmap */
         cfs_spinlock_t           lut_client_bitmap_lock;
         /** Bitmap of known clients */
-        unsigned long            lut_client_bitmap[LR_CLIENT_BITMAP_SIZE];
+        unsigned long           *lut_client_bitmap;
         /** Number of mounts */
         __u64                    lut_mount_count;
         /** Number of mounts */
         __u64                    lut_mount_count;
-        __u32                    lut_stale_export_age;
-        cfs_spinlock_t           lut_trans_table_lock;
 };
 
 typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno,
 };
 
 typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno,
@@ -76,5 +74,5 @@ void lut_cb_client(struct lu_target *, __u64, void *, int);
 int lut_init(const struct lu_env *, struct lu_target *,
              struct obd_device *, struct dt_device *);
 void lut_fini(const struct lu_env *, struct lu_target *);
 int lut_init(const struct lu_env *, struct lu_target *,
              struct obd_device *, struct dt_device *);
 void lut_fini(const struct lu_env *, struct lu_target *);
-
+void lut_client_free(struct obd_export *);
 #endif /* __LUSTRE_LU_TARGET_H */
 #endif /* __LUSTRE_LU_TARGET_H */
index 8141fce..0a456db 100644 (file)
@@ -217,8 +217,6 @@ struct lustre_mount_data {
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
-#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long))
-
 /** COMPAT_146: this is an OST (temporary) */
 #define OBD_COMPAT_OST          0x00000002
 /** COMPAT_146: this is an MDT (temporary) */
 /** COMPAT_146: this is an OST (temporary) */
 #define OBD_COMPAT_OST          0x00000002
 /** COMPAT_146: this is an MDT (temporary) */
index 454f383..bedb530 100644 (file)
@@ -232,17 +232,7 @@ struct obd_device_target {
         struct super_block       *obt_sb;
         /** last_rcvd file */
         struct file              *obt_rcvd_filp;
         struct super_block       *obt_sb;
         /** last_rcvd file */
         struct file              *obt_rcvd_filp;
-        /** server data in last_rcvd file */
-        struct lr_server_data    *obt_lsd;
-        /** Lock protecting client bitmap */
-        cfs_spinlock_t            obt_client_bitmap_lock;
-        /** Bitmap of known clients */
-        unsigned long            *obt_client_bitmap;
-        /** Server last transaction number */
-        __u64                     obt_last_transno;
-        /** Lock protecting last transaction number */
-        cfs_spinlock_t            obt_translock;
-        /** Number of mounts */
+        struct lu_target         *obt_lut;
         __u64                     obt_mount_count;
         cfs_semaphore_t           obt_quotachecking;
         struct lustre_quota_ctxt  obt_qctxt;
         __u64                     obt_mount_count;
         cfs_semaphore_t           obt_quotachecking;
         struct lustre_quota_ctxt  obt_qctxt;
@@ -287,7 +277,6 @@ struct filter_ext {
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
-        struct lu_target     fo_lut;
         const char          *fo_fstype;
 
         int                  fo_group_count;
         const char          *fo_fstype;
 
         int                  fo_group_count;
@@ -368,11 +357,10 @@ struct filter_obd {
         int                      fo_sec_level;
 };
 
         int                      fo_sec_level;
 };
 
-#define fo_translock            fo_obt.obt_translock
+#define fo_translock            fo_obt.obt_lut->lut_translock
+#define fo_last_rcvd_slots      fo_obt.obt_lut->lut_client_bitmap
+#define fo_mount_count          fo_obt.obt_lut->lut_mount_count
 #define fo_rcvd_filp            fo_obt.obt_rcvd_filp
 #define fo_rcvd_filp            fo_obt.obt_rcvd_filp
-#define fo_fsd                  fo_obt.obt_lsd
-#define fo_last_rcvd_slots      fo_obt.obt_client_bitmap
-#define fo_mount_count          fo_obt.obt_mount_count
 #define fo_vfsmnt               fo_obt.obt_vfsmnt
 
 struct timeout_item {
 #define fo_vfsmnt               fo_obt.obt_vfsmnt
 
 struct timeout_item {
index 550e0a8..0a16ece 100644 (file)
@@ -103,7 +103,6 @@ void obd_zombie_impexp_stop(void);
 void obd_zombie_impexp_cull(void);
 void obd_zombie_barrier(void);
 void obd_exports_barrier(struct obd_device *obd);
 void obd_zombie_impexp_cull(void);
 void obd_zombie_barrier(void);
 void obd_exports_barrier(struct obd_device *obd);
-
 /* obd_config.c */
 int class_process_config(struct lustre_cfg *lcfg);
 int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
 /* obd_config.c */
 int class_process_config(struct lustre_cfg *lcfg);
 int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
@@ -272,6 +271,18 @@ static inline enum obd_option exp_flags_from_obd(struct obd_device *obd)
                 0);
 }
 
                 0);
 }
 
+static inline struct lu_target *class_exp2tgt(struct obd_export *exp)
+{
+        LASSERT(exp->exp_obd);
+        return exp->exp_obd->u.obt.obt_lut;
+}
+
+static inline struct lr_server_data *class_server_data(struct obd_device *obd)
+{
+        LASSERT(obd->u.obt.obt_lut);
+        return &obd->u.obt.obt_lut->lut_lsd;
+}
+
 void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid);
 void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj);
 void obdo_from_iattr(struct obdo *oa, struct iattr *attr,
 void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid);
 void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj);
 void obdo_from_iattr(struct obdo *oa, struct iattr *attr,
index ed96c2b..f8a6f9b 100644 (file)
@@ -4498,8 +4498,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
-        cfs_spin_lock_init(&m->mdt_transno_lock);
-
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
         m->mdt_som_conf = 0;
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
         m->mdt_som_conf = 0;
@@ -4539,8 +4537,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_nosquash_strlen = 0;
         cfs_init_rwsem(&m->mdt_squash_sem);
 
         m->mdt_nosquash_strlen = 0;
         cfs_init_rwsem(&m->mdt_squash_sem);
 
-        cfs_spin_lock_init(&m->mdt_client_bitmap_lock);
-
         OBD_ALLOC_PTR(mite);
         if (mite == NULL)
                 GOTO(err_lmi, rc = -ENOMEM);
         OBD_ALLOC_PTR(mite);
         if (mite == NULL)
                 GOTO(err_lmi, rc = -ENOMEM);
@@ -5211,6 +5207,7 @@ static int mdt_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
+        lut_client_free(exp);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 9c1a885..f41651a 100644 (file)
@@ -703,25 +703,15 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
                 GOTO(free, rc = PTR_ERR(th));
 
         cfs_mutex_down(&med->med_lcd_lock);
                 GOTO(free, rc = PTR_ERR(th));
 
         cfs_mutex_down(&med->med_lcd_lock);
-        memset(lcd, 0, sizeof *lcd);
+        memset(lcd->lcd_uuid, 0, sizeof lcd->lcd_uuid);
         rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
         rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
-        med->med_lcd = NULL;
         cfs_mutex_up(&med->med_lcd_lock);
         mdt_trans_stop(env, mdt, th);
 
         cfs_mutex_up(&med->med_lcd_lock);
         mdt_trans_stop(env, mdt, th);
 
-        cfs_spin_lock(&mdt->mdt_client_bitmap_lock);
-        cfs_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
-        cfs_spin_unlock(&mdt->mdt_client_bitmap_lock);
-
         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
                "%s, rc %d\n",  med->med_lr_idx, LAST_RCVD, rc);
         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
                "%s, rc %d\n",  med->med_lr_idx, LAST_RCVD, rc);
-        OBD_FREE_PTR(lcd);
         RETURN(0);
 free:
         RETURN(0);
 free:
-        cfs_mutex_down(&med->med_lcd_lock);
-        med->med_lcd = NULL;
-        cfs_mutex_up(&med->med_lcd_lock);
-        OBD_FREE_PTR(lcd);
         return 0;
 }
 
         return 0;
 }
 
index 4aed2ae..8043556 100644 (file)
@@ -122,6 +122,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(exp->exp_obd);
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
@@ -142,22 +143,19 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
-                err = filter_update_server_data(exp->exp_obd,
-                                                filter->fo_rcvd_filp,
-                                                filter->fo_fsd);
+                err = filter_update_server_data(exp->exp_obd);
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
-                filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1;
+                fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         } else {
                 last_rcvd = oti->oti_transno;
         } else {
                 last_rcvd = oti->oti_transno;
-                if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
-                        filter->fo_fsd->lsd_last_transno =
-                                cpu_to_le64(last_rcvd);
+                if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
+                        fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         }
         oti->oti_transno = last_rcvd;
 
         }
         oti->oti_transno = last_rcvd;
 
@@ -313,6 +311,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(obd);
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
@@ -350,8 +349,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         }
 
         fed->fed_lr_idx = cl_idx;
         }
 
         fed->fed_lr_idx = cl_idx;
-        fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
-                cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
+        fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) +
+                          cl_idx * le16_to_cpu(fsd->lsd_client_size);
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
@@ -376,8 +375,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
-                        fed->fed_lcd->lcd_last_epoch =
-                                              filter->fo_fsd->lsd_start_epoch;
+                        fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch;
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
@@ -406,9 +404,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
-
-static int filter_client_free(struct obd_export *exp)
+static int filter_client_del(struct obd_export *exp)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
@@ -453,12 +449,13 @@ static int filter_client_free(struct obd_export *exp)
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
-        filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd);
+        filter_update_server_data(obd);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
-        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd,
-                                 sizeof(zero_lcd), &off, 0);
-        fed->fed_lcd = NULL;
+        memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid);
+        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+                                 fed->fed_lcd,
+                                 sizeof(*fed->fed_lcd), &off, 0);
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
@@ -466,21 +463,8 @@ static int filter_client_free(struct obd_export *exp)
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
-
-        if (!cfs_test_and_clear_bit(fed->fed_lr_idx,
-                                    filter->fo_last_rcvd_slots)) {
-                CERROR("FILTER client %u: bit already clear in bitmap!!\n",
-                       fed->fed_lr_idx);
-                LBUG();
-        }
-        OBD_FREE_PTR(lcd);
-        RETURN(0);
+        EXIT;
 free:
 free:
-        cfs_mutex_down(&fed->fed_lcd_lock);
-        fed->fed_lcd = NULL;
-        cfs_mutex_up(&fed->fed_lcd_lock);
-        OBD_FREE_PTR(lcd);
-
         return 0;
 }
 
         return 0;
 }
 
@@ -670,17 +654,16 @@ static int filter_init_export(struct obd_export *exp)
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
-        OBD_FREE_PTR(filter->fo_fsd);
-        filter->fo_fsd = NULL;
-        OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        filter->fo_last_rcvd_slots = NULL;
+        lut_fini(NULL, filter->fo_obt.obt_lut);
+        OBD_FREE_PTR(filter->fo_obt.obt_lut);
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd, struct file *filp,
-                              struct lr_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd)
 {
 {
+        struct file *filp = obd->u.obt.obt_rcvd_filp;
+        struct lr_server_data *fsd = class_server_data(obd);
         loff_t off = 0;
         int rc;
         ENTRY;
         loff_t off = 0;
         int rc;
         ENTRY;
@@ -733,6 +716,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
+        struct lu_target *lut;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
@@ -745,17 +729,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
-        OBD_ALLOC(fsd, sizeof(*fsd));
-        if (!fsd)
-                RETURN(-ENOMEM);
-        filter->fo_fsd = fsd;
-
-        OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        if (filter->fo_last_rcvd_slots == NULL) {
-                OBD_FREE(fsd, sizeof(*fsd));
+        /* allocate and initialize lu_target */
+        OBD_ALLOC_PTR(lut);
+        if (lut == NULL)
                 RETURN(-ENOMEM);
                 RETURN(-ENOMEM);
-        }
-
+        rc = lut_init(NULL, lut, obd, NULL);
+        if (rc)
+                GOTO(err_fsd, rc);
+        fsd = class_server_data(obd);
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
@@ -929,7 +910,7 @@ out:
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
-        rc = filter_update_server_data(obd, filp, filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 GOTO(err_client, rc);
 
         if (rc)
                 GOTO(err_client, rc);
 
@@ -1317,16 +1298,13 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
-        /** lu_target has very limited use in filter now */
-        lut_init(NULL, &filter->fo_lut, obd, NULL);
-
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
-
-        target_recovery_init(&filter->fo_lut, ost_handle);
+        LASSERT(obd->u.obt.obt_lut);
+        target_recovery_init(obd->u.obt.obt_lut, ost_handle);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
@@ -1380,8 +1358,7 @@ static void filter_post(struct obd_device *obd)
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
@@ -1414,7 +1391,6 @@ static void filter_post(struct obd_device *obd)
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
@@ -1425,7 +1401,6 @@ static void filter_set_last_id(struct filter_obd *filter,
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
         LASSERT(filter->fo_last_objids != NULL);
 
         LASSERT(group <= filter->fo_group_count);
         LASSERT(filter->fo_last_objids != NULL);
 
@@ -2068,7 +2043,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
-        cfs_spin_lock_init(&filter->fo_translock);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
@@ -2704,8 +2678,7 @@ static int filter_connect_internal(struct obd_export *exp,
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
-                struct filter_obd *filter = &exp->exp_obd->u.filter;
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(exp->exp_obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
@@ -2715,8 +2688,7 @@ static int filter_connect_internal(struct obd_export *exp,
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
-                        filter_update_server_data(exp->exp_obd,
-                                                  filter->fo_rcvd_filp, lsd);
+                        filter_update_server_data(exp->exp_obd);
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
@@ -2972,12 +2944,13 @@ static void filter_grant_discard(struct obd_export *exp)
 
 static int filter_destroy_export(struct obd_export *exp)
 {
 
 static int filter_destroy_export(struct obd_export *exp)
 {
+        struct filter_export_data *fed = &exp->exp_filter_data;
         ENTRY;
 
         ENTRY;
 
-        if (exp->exp_filter_data.fed_pending)
+        if (fed->fed_pending)
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
-                       exp, exp->exp_filter_data.fed_pending);
+                       exp, fed->fed_pending);
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
@@ -2987,6 +2960,7 @@ static int filter_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
+        lut_client_free(exp);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
@@ -3082,7 +3056,7 @@ static int filter_disconnect(struct obd_export *exp)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
-                filter_client_free(exp);
+                filter_client_del(exp);
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
@@ -3714,7 +3688,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||
index d3df199..36bf7ae 100644 (file)
@@ -138,8 +138,7 @@ __u64 filter_next_id(struct filter_obd *, struct obdo *);
 __u64 filter_last_id(struct filter_obd *, obd_gr group);
 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
                         void *handle, struct obdo *oa);
 __u64 filter_last_id(struct filter_obd *, obd_gr group);
 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
                         void *handle, struct obdo *oa);
-int filter_update_server_data(struct obd_device *, struct file *,
-                              struct lr_server_data *);
+int filter_update_server_data(struct obd_device *);
 int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg,
                         void *option);
 int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg,
                         void *option);
index 4704c20..2b2d522 100644 (file)
@@ -41,6 +41,7 @@
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
+
 /**
  * Update client data in last_rcvd file. An obd API
  */
 /**
  * Update client data in last_rcvd file. An obd API
  */
@@ -48,6 +49,7 @@ static int obt_client_data_update(struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
         struct obd_device_target *obt = &exp->exp_obd->u.obt;
 {
         struct lu_export_data *led = &exp->exp_target_data;
         struct obd_device_target *obt = &exp->exp_obd->u.obt;
+        struct lu_target *lut = class_exp2tgt(exp);
         loff_t off = led->led_lr_off;
         int rc = 0;
 
         loff_t off = led->led_lr_off;
         int rc = 0;
 
@@ -56,7 +58,7 @@ static int obt_client_data_update(struct obd_export *exp)
 
         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
 
         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
-               le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
+               le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
 
         return rc;
 }
 
         return rc;
 }
@@ -64,21 +66,22 @@ static int obt_client_data_update(struct obd_export *exp)
 /**
  * Update server data in last_rcvd file. An obd API
  */
 /**
  * Update server data in last_rcvd file. An obd API
  */
-int obt_server_data_update(struct obd_device *obd, int force_sync)
+int obt_server_data_update(struct lu_target *lut, int force_sync)
 {
 {
-        struct obd_device_target *obt = &obd->u.obt;
+        struct obd_device_target *obt = &lut->lut_obd->u.obt;
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
-               obt->obt_lsd->lsd_uuid,
-               le64_to_cpu(obt->obt_lsd->lsd_mount_count),
-               le64_to_cpu(obt->obt_lsd->lsd_last_transno));
+               lut->lut_lsd.lsd_uuid,
+               le64_to_cpu(lut->lut_lsd.lsd_mount_count),
+               le64_to_cpu(lut->lut_lsd.lsd_last_transno));
 
 
-        rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
-                                 sizeof(*obt->obt_lsd), &off, force_sync);
+        rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
+                                 &lut->lut_lsd, sizeof(lut->lut_lsd),
+                                 &off, force_sync);
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
 
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
 
@@ -91,32 +94,32 @@ int obt_server_data_update(struct obd_device *obd, int force_sync)
 void obt_client_epoch_update(struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
 void obt_client_epoch_update(struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
-        struct obd_device_target *obt = &exp->exp_obd->u.obt;
+        struct lu_target *lut = class_exp2tgt(exp);
 
         /** VBR: set client last_epoch to current epoch */
         if (le32_to_cpu(lcd->lcd_last_epoch) >=
 
         /** VBR: set client last_epoch to current epoch */
         if (le32_to_cpu(lcd->lcd_last_epoch) >=
-            le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
+            le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
                 return;
                 return;
-        lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
+        lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
         obt_client_data_update(exp);
 }
 
 /**
  * Increment server epoch. An obd API
  */
         obt_client_data_update(exp);
 }
 
 /**
  * Increment server epoch. An obd API
  */
-static void obt_boot_epoch_update(struct obd_device *obd)
+static void obt_boot_epoch_update(struct lu_target *lut)
 {
 {
+        struct obd_device *obd = lut->lut_obd;
         __u32 start_epoch;
         __u32 start_epoch;
-        struct obd_device_target *obt = &obd->u.obt;
         struct ptlrpc_request *req;
         cfs_list_t client_list;
 
         struct ptlrpc_request *req;
         cfs_list_t client_list;
 
-        cfs_spin_lock(&obt->obt_translock);
-        start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
-        obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
+        cfs_spin_lock(&lut->lut_translock);
+        start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
+        lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
                                             LR_EPOCH_BITS);
                                             LR_EPOCH_BITS);
-        obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
-        cfs_spin_unlock(&obt->obt_translock);
+        lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
+        cfs_spin_unlock(&lut->lut_translock);
 
         CFS_INIT_LIST_HEAD(&client_list);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
 
         CFS_INIT_LIST_HEAD(&client_list);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
@@ -135,7 +138,7 @@ static void obt_boot_epoch_update(struct obd_device *obd)
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-        obt_server_data_update(obd, 1);
+        obt_server_data_update(lut, 1);
 }
 
 /**
 }
 
 /**
@@ -168,12 +171,33 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
 }
 
 /**
 }
 
 /**
+ * Free in-memory data for client slot related to export.
+ */
+void lut_client_free(struct obd_export *exp)
+{
+        struct lu_export_data *led = &exp->exp_target_data;
+        struct lu_target *lut = class_exp2tgt(exp);
+
+        OBD_FREE_PTR(led->led_lcd);
+        led->led_lcd = NULL;
+        /* Clear bit when lcd is freed */
+        spin_lock(&lut->lut_client_bitmap_lock);
+        if (!test_and_clear_bit(led->led_lr_idx, lut->lut_client_bitmap)) {
+                CERROR("%s: client %u bit already clear in bitmap\n",
+                       exp->exp_obd->obd_name, led->led_lr_idx);
+                LBUG();
+        }
+        spin_unlock(&lut->lut_client_bitmap_lock);
+}
+EXPORT_SYMBOL(lut_client_free);
+
+/**
  * Update client data in last_rcvd
  */
  * Update client data in last_rcvd
  */
-int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
-                            struct obd_export *exp)
+int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
 {
         struct lu_export_data *led = &exp->exp_target_data;
+        struct lu_target *lut = class_exp2tgt(exp);
         struct lsd_client_data tmp_lcd;
         loff_t tmp_off = led->led_lr_off;
         struct lu_buf tmp_buf = {
         struct lsd_client_data tmp_lcd;
         loff_t tmp_off = led->led_lr_off;
         struct lu_buf tmp_buf = {
@@ -219,17 +243,17 @@ static int lut_server_data_update(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
-                             struct obd_export *exp)
+void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+        struct lu_target *lut = class_exp2tgt(exp);
 
         LASSERT(lut->lut_bottom);
         /** VBR: set client last_epoch to current epoch */
         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
                 return;
         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
 
         LASSERT(lut->lut_bottom);
         /** VBR: set client last_epoch to current epoch */
         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
                 return;
         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
-        lut_client_data_update(env, lut, exp);
+        lut_client_data_update(env, exp);
 }
 
 /**
 }
 
 /**
@@ -247,7 +271,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
                 return;
         /** Increase server epoch after recovery */
         if (lut->lut_bottom == NULL)
                 return;
         /** Increase server epoch after recovery */
         if (lut->lut_bottom == NULL)
-                return obt_boot_epoch_update(lut->lut_obd);
+                return obt_boot_epoch_update(lut);
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
@@ -277,7 +301,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
         cfs_list_for_each_entry(req, &client_list, rq_list) {
                 LASSERT(!req->rq_export->exp_delayed);
                 if (!req->rq_export->exp_vbr_failed)
         cfs_list_for_each_entry(req, &client_list, rq_list) {
                 LASSERT(!req->rq_export->exp_delayed);
                 if (!req->rq_export->exp_vbr_failed)
-                        lut_client_epoch_update(&env, lut, req->rq_export);
+                        lut_client_epoch_update(&env, req->rq_export);
         }
         /** return list back at once */
         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
         }
         /** return list back at once */
         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
@@ -332,13 +356,19 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
+        LASSERT(lut);
+        LASSERT(obd);
         lut->lut_obd = obd;
         lut->lut_bottom = dt;
         lut->lut_last_rcvd = NULL;
         lut->lut_obd = obd;
         lut->lut_bottom = dt;
         lut->lut_last_rcvd = NULL;
+        obd->u.obt.obt_lut = lut;
 
         cfs_spin_lock_init(&lut->lut_translock);
         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
 
         cfs_spin_lock_init(&lut->lut_translock);
         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
-        cfs_spin_lock_init(&lut->lut_trans_table_lock);
+
+        OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+        if (lut->lut_client_bitmap == NULL)
+                RETURN(-ENOMEM);
 
         /** obdfilter has no lu_device stack yet */
         if (dt == NULL)
 
         /** obdfilter has no lu_device stack yet */
         if (dt == NULL)
@@ -347,6 +377,8 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         if (!IS_ERR(o)) {
                 lut->lut_last_rcvd = o;
         } else {
         if (!IS_ERR(o)) {
                 lut->lut_last_rcvd = o;
         } else {
+                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+                lut->lut_client_bitmap = NULL;
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
         }
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
         }
@@ -358,9 +390,15 @@ EXPORT_SYMBOL(lut_init);
 void lut_fini(const struct lu_env *env, struct lu_target *lut)
 {
         ENTRY;
 void lut_fini(const struct lu_env *env, struct lu_target *lut)
 {
         ENTRY;
-        if (lut->lut_last_rcvd)
+
+        if (lut->lut_client_bitmap) {
+                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+                lut->lut_client_bitmap = NULL;
+        }
+        if (lut->lut_last_rcvd) {
                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
-        lut->lut_last_rcvd = NULL;
+                lut->lut_last_rcvd = NULL;
+        }
         EXIT;
 }
 EXPORT_SYMBOL(lut_fini);
         EXIT;
 }
 EXPORT_SYMBOL(lut_fini);