Whamcloud - gitweb
Revert "b=21485 Keep in-memory client data until export destroy"
authorRobert Read <rread@sun.com>
Tue, 2 Mar 2010 02:08:23 +0000 (18:08 -0800)
committerRobert Read <rread@sun.com>
Tue, 2 Mar 2010 02:08:47 +0000 (18:08 -0800)
This reverts commit 64d7c036a5429a9ce5435aea96e30935b1cef44a.

Causes compile error:
.../lustre/include/lu_target.h:59: error: 'LR_CLIENT_BITMAP_SIZE' undeclared here (not in a function)

lustre/include/lustre_disk.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/ptlrpc/target.c

index 0a456db..8141fce 100644 (file)
@@ -217,6 +217,8 @@ struct lustre_mount_data {
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
 #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
 #endif
 
+#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long))
+
 /** COMPAT_146: this is an OST (temporary) */
 #define OBD_COMPAT_OST          0x00000002
 /** COMPAT_146: this is an MDT (temporary) */
 /** COMPAT_146: this is an OST (temporary) */
 #define OBD_COMPAT_OST          0x00000002
 /** COMPAT_146: this is an MDT (temporary) */
index bedb530..454f383 100644 (file)
@@ -232,7 +232,17 @@ struct obd_device_target {
         struct super_block       *obt_sb;
         /** last_rcvd file */
         struct file              *obt_rcvd_filp;
         struct super_block       *obt_sb;
         /** last_rcvd file */
         struct file              *obt_rcvd_filp;
-        struct lu_target         *obt_lut;
+        /** server data in last_rcvd file */
+        struct lr_server_data    *obt_lsd;
+        /** Lock protecting client bitmap */
+        cfs_spinlock_t            obt_client_bitmap_lock;
+        /** Bitmap of known clients */
+        unsigned long            *obt_client_bitmap;
+        /** Server last transaction number */
+        __u64                     obt_last_transno;
+        /** Lock protecting last transaction number */
+        cfs_spinlock_t            obt_translock;
+        /** Number of mounts */
         __u64                     obt_mount_count;
         cfs_semaphore_t           obt_quotachecking;
         struct lustre_quota_ctxt  obt_qctxt;
         __u64                     obt_mount_count;
         cfs_semaphore_t           obt_quotachecking;
         struct lustre_quota_ctxt  obt_qctxt;
@@ -277,6 +287,7 @@ struct filter_ext {
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
+        struct lu_target     fo_lut;
         const char          *fo_fstype;
 
         int                  fo_group_count;
         const char          *fo_fstype;
 
         int                  fo_group_count;
@@ -357,10 +368,11 @@ struct filter_obd {
         int                      fo_sec_level;
 };
 
         int                      fo_sec_level;
 };
 
-#define fo_translock            fo_obt.obt_lut->lut_translock
-#define fo_last_rcvd_slots      fo_obt.obt_lut->lut_client_bitmap
-#define fo_mount_count          fo_obt.obt_lut->lut_mount_count
+#define fo_translock            fo_obt.obt_translock
 #define fo_rcvd_filp            fo_obt.obt_rcvd_filp
 #define fo_rcvd_filp            fo_obt.obt_rcvd_filp
+#define fo_fsd                  fo_obt.obt_lsd
+#define fo_last_rcvd_slots      fo_obt.obt_client_bitmap
+#define fo_mount_count          fo_obt.obt_mount_count
 #define fo_vfsmnt               fo_obt.obt_vfsmnt
 
 struct timeout_item {
 #define fo_vfsmnt               fo_obt.obt_vfsmnt
 
 struct timeout_item {
index 0a16ece..550e0a8 100644 (file)
@@ -103,6 +103,7 @@ void obd_zombie_impexp_stop(void);
 void obd_zombie_impexp_cull(void);
 void obd_zombie_barrier(void);
 void obd_exports_barrier(struct obd_device *obd);
 void obd_zombie_impexp_cull(void);
 void obd_zombie_barrier(void);
 void obd_exports_barrier(struct obd_device *obd);
+
 /* obd_config.c */
 int class_process_config(struct lustre_cfg *lcfg);
 int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
 /* obd_config.c */
 int class_process_config(struct lustre_cfg *lcfg);
 int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
@@ -271,18 +272,6 @@ static inline enum obd_option exp_flags_from_obd(struct obd_device *obd)
                 0);
 }
 
                 0);
 }
 
-static inline struct lu_target *class_exp2tgt(struct obd_export *exp)
-{
-        LASSERT(exp->exp_obd);
-        return exp->exp_obd->u.obt.obt_lut;
-}
-
-static inline struct lr_server_data *class_server_data(struct obd_device *obd)
-{
-        LASSERT(obd->u.obt.obt_lut);
-        return &obd->u.obt.obt_lut->lut_lsd;
-}
-
 void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid);
 void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj);
 void obdo_from_iattr(struct obdo *oa, struct iattr *attr,
 void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid);
 void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj);
 void obdo_from_iattr(struct obdo *oa, struct iattr *attr,
index f8a6f9b..ed96c2b 100644 (file)
@@ -4498,6 +4498,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
+        cfs_spin_lock_init(&m->mdt_transno_lock);
+
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
         m->mdt_som_conf = 0;
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
         m->mdt_som_conf = 0;
@@ -4537,6 +4539,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_nosquash_strlen = 0;
         cfs_init_rwsem(&m->mdt_squash_sem);
 
         m->mdt_nosquash_strlen = 0;
         cfs_init_rwsem(&m->mdt_squash_sem);
 
+        cfs_spin_lock_init(&m->mdt_client_bitmap_lock);
+
         OBD_ALLOC_PTR(mite);
         if (mite == NULL)
                 GOTO(err_lmi, rc = -ENOMEM);
         OBD_ALLOC_PTR(mite);
         if (mite == NULL)
                 GOTO(err_lmi, rc = -ENOMEM);
@@ -5207,7 +5211,6 @@ static int mdt_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
-        lut_client_free(exp);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index f41651a..9c1a885 100644 (file)
@@ -703,15 +703,25 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
                 GOTO(free, rc = PTR_ERR(th));
 
         cfs_mutex_down(&med->med_lcd_lock);
                 GOTO(free, rc = PTR_ERR(th));
 
         cfs_mutex_down(&med->med_lcd_lock);
-        memset(lcd->lcd_uuid, 0, sizeof lcd->lcd_uuid);
+        memset(lcd, 0, sizeof *lcd);
         rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
         rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
+        med->med_lcd = NULL;
         cfs_mutex_up(&med->med_lcd_lock);
         mdt_trans_stop(env, mdt, th);
 
         cfs_mutex_up(&med->med_lcd_lock);
         mdt_trans_stop(env, mdt, th);
 
+        cfs_spin_lock(&mdt->mdt_client_bitmap_lock);
+        cfs_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
+        cfs_spin_unlock(&mdt->mdt_client_bitmap_lock);
+
         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
                "%s, rc %d\n",  med->med_lr_idx, LAST_RCVD, rc);
         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
                "%s, rc %d\n",  med->med_lr_idx, LAST_RCVD, rc);
+        OBD_FREE_PTR(lcd);
         RETURN(0);
 free:
         RETURN(0);
 free:
+        cfs_mutex_down(&med->med_lcd_lock);
+        med->med_lcd = NULL;
+        cfs_mutex_up(&med->med_lcd_lock);
+        OBD_FREE_PTR(lcd);
         return 0;
 }
 
         return 0;
 }
 
index 8043556..4aed2ae 100644 (file)
@@ -122,7 +122,6 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
-        struct lr_server_data *fsd = class_server_data(exp->exp_obd);
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
@@ -143,19 +142,22 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
-                err = filter_update_server_data(exp->exp_obd);
+                err = filter_update_server_data(exp->exp_obd,
+                                                filter->fo_rcvd_filp,
+                                                filter->fo_fsd);
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1;
-                fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
+                filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         } else {
                 last_rcvd = oti->oti_transno;
         } else {
                 last_rcvd = oti->oti_transno;
-                if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
-                        fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
+                        filter->fo_fsd->lsd_last_transno =
+                                cpu_to_le64(last_rcvd);
         }
         oti->oti_transno = last_rcvd;
 
         }
         oti->oti_transno = last_rcvd;
 
@@ -311,7 +313,6 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
-        struct lr_server_data *fsd = class_server_data(obd);
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
@@ -349,8 +350,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         }
 
         fed->fed_lr_idx = cl_idx;
         }
 
         fed->fed_lr_idx = cl_idx;
-        fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) +
-                          cl_idx * le16_to_cpu(fsd->lsd_client_size);
+        fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
+                cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
@@ -375,7 +376,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
-                        fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch;
+                        fed->fed_lcd->lcd_last_epoch =
+                                              filter->fo_fsd->lsd_start_epoch;
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
@@ -404,7 +406,9 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static int filter_client_del(struct obd_export *exp)
+struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
+
+static int filter_client_free(struct obd_export *exp)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
@@ -449,13 +453,12 @@ static int filter_client_del(struct obd_export *exp)
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
-        filter_update_server_data(obd);
+        filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
-        memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid);
-        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp,
-                                 fed->fed_lcd,
-                                 sizeof(*fed->fed_lcd), &off, 0);
+        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd,
+                                 sizeof(zero_lcd), &off, 0);
+        fed->fed_lcd = NULL;
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
@@ -463,8 +466,21 @@ static int filter_client_del(struct obd_export *exp)
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
-        EXIT;
+
+        if (!cfs_test_and_clear_bit(fed->fed_lr_idx,
+                                    filter->fo_last_rcvd_slots)) {
+                CERROR("FILTER client %u: bit already clear in bitmap!!\n",
+                       fed->fed_lr_idx);
+                LBUG();
+        }
+        OBD_FREE_PTR(lcd);
+        RETURN(0);
 free:
 free:
+        cfs_mutex_down(&fed->fed_lcd_lock);
+        fed->fed_lcd = NULL;
+        cfs_mutex_up(&fed->fed_lcd_lock);
+        OBD_FREE_PTR(lcd);
+
         return 0;
 }
 
         return 0;
 }
 
@@ -654,16 +670,17 @@ static int filter_init_export(struct obd_export *exp)
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
-        lut_fini(NULL, filter->fo_obt.obt_lut);
-        OBD_FREE_PTR(filter->fo_obt.obt_lut);
+        OBD_FREE_PTR(filter->fo_fsd);
+        filter->fo_fsd = NULL;
+        OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
+        filter->fo_last_rcvd_slots = NULL;
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd)
+int filter_update_server_data(struct obd_device *obd, struct file *filp,
+                              struct lr_server_data *fsd)
 {
 {
-        struct file *filp = obd->u.obt.obt_rcvd_filp;
-        struct lr_server_data *fsd = class_server_data(obd);
         loff_t off = 0;
         int rc;
         ENTRY;
         loff_t off = 0;
         int rc;
         ENTRY;
@@ -716,7 +733,6 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
-        struct lu_target *lut;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
@@ -729,14 +745,17 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
-        /* allocate and initialize lu_target */
-        OBD_ALLOC_PTR(lut);
-        if (lut == NULL)
+        OBD_ALLOC(fsd, sizeof(*fsd));
+        if (!fsd)
                 RETURN(-ENOMEM);
                 RETURN(-ENOMEM);
-        rc = lut_init(NULL, lut, obd, NULL);
-        if (rc)
-                GOTO(err_fsd, rc);
-        fsd = class_server_data(obd);
+        filter->fo_fsd = fsd;
+
+        OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
+        if (filter->fo_last_rcvd_slots == NULL) {
+                OBD_FREE(fsd, sizeof(*fsd));
+                RETURN(-ENOMEM);
+        }
+
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
@@ -910,7 +929,7 @@ out:
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
-        rc = filter_update_server_data(obd);
+        rc = filter_update_server_data(obd, filp, filter->fo_fsd);
         if (rc)
                 GOTO(err_client, rc);
 
         if (rc)
                 GOTO(err_client, rc);
 
@@ -1298,13 +1317,16 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
+        /** lu_target has very limited use in filter now */
+        lut_init(NULL, &filter->fo_lut, obd, NULL);
+
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
-        LASSERT(obd->u.obt.obt_lut);
-        target_recovery_init(obd->u.obt.obt_lut, ost_handle);
+
+        target_recovery_init(&filter->fo_lut, ost_handle);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
@@ -1358,7 +1380,8 @@ static void filter_post(struct obd_device *obd)
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_update_server_data(obd);
+        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
+                                       filter->fo_fsd);
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
@@ -1391,6 +1414,7 @@ static void filter_post(struct obd_device *obd)
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
+        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
@@ -1401,6 +1425,7 @@ static void filter_set_last_id(struct filter_obd *filter,
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
+        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
         LASSERT(filter->fo_last_objids != NULL);
 
         LASSERT(group <= filter->fo_group_count);
         LASSERT(filter->fo_last_objids != NULL);
 
@@ -2043,6 +2068,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
+        cfs_spin_lock_init(&filter->fo_translock);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
@@ -2678,7 +2704,8 @@ static int filter_connect_internal(struct obd_export *exp,
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
-                struct lr_server_data *lsd = class_server_data(exp->exp_obd);
+                struct filter_obd *filter = &exp->exp_obd->u.filter;
+                struct lr_server_data *lsd = filter->fo_fsd;
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
@@ -2688,7 +2715,8 @@ static int filter_connect_internal(struct obd_export *exp,
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
-                        filter_update_server_data(exp->exp_obd);
+                        filter_update_server_data(exp->exp_obd,
+                                                  filter->fo_rcvd_filp, lsd);
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
@@ -2944,13 +2972,12 @@ static void filter_grant_discard(struct obd_export *exp)
 
 static int filter_destroy_export(struct obd_export *exp)
 {
 
 static int filter_destroy_export(struct obd_export *exp)
 {
-        struct filter_export_data *fed = &exp->exp_filter_data;
         ENTRY;
 
         ENTRY;
 
-        if (fed->fed_pending)
+        if (exp->exp_filter_data.fed_pending)
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
-                       exp, fed->fed_pending);
+                       exp, exp->exp_filter_data.fed_pending);
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
@@ -2960,7 +2987,6 @@ static int filter_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
-        lut_client_free(exp);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
@@ -3056,7 +3082,7 @@ static int filter_disconnect(struct obd_export *exp)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
-                filter_client_del(exp);
+                filter_client_free(exp);
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
@@ -3688,7 +3714,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
-                struct lr_server_data *lsd = class_server_data(obd);
+                struct lr_server_data *lsd = filter->fo_fsd;
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||
index 36bf7ae..d3df199 100644 (file)
@@ -138,7 +138,8 @@ __u64 filter_next_id(struct filter_obd *, struct obdo *);
 __u64 filter_last_id(struct filter_obd *, obd_gr group);
 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
                         void *handle, struct obdo *oa);
 __u64 filter_last_id(struct filter_obd *, obd_gr group);
 int filter_update_fidea(struct obd_export *exp, struct inode *inode,
                         void *handle, struct obdo *oa);
-int filter_update_server_data(struct obd_device *);
+int filter_update_server_data(struct obd_device *, struct file *,
+                              struct lr_server_data *);
 int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg,
                         void *option);
 int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg,
                         void *option);
index 2b2d522..4704c20 100644 (file)
@@ -41,7 +41,6 @@
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
-
 /**
  * Update client data in last_rcvd file. An obd API
  */
 /**
  * Update client data in last_rcvd file. An obd API
  */
@@ -49,7 +48,6 @@ static int obt_client_data_update(struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
         struct obd_device_target *obt = &exp->exp_obd->u.obt;
 {
         struct lu_export_data *led = &exp->exp_target_data;
         struct obd_device_target *obt = &exp->exp_obd->u.obt;
-        struct lu_target *lut = class_exp2tgt(exp);
         loff_t off = led->led_lr_off;
         int rc = 0;
 
         loff_t off = led->led_lr_off;
         int rc = 0;
 
@@ -58,7 +56,7 @@ static int obt_client_data_update(struct obd_export *exp)
 
         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
 
         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
-               le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
+               le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
 
         return rc;
 }
 
         return rc;
 }
@@ -66,22 +64,21 @@ static int obt_client_data_update(struct obd_export *exp)
 /**
  * Update server data in last_rcvd file. An obd API
  */
 /**
  * Update server data in last_rcvd file. An obd API
  */
-int obt_server_data_update(struct lu_target *lut, int force_sync)
+int obt_server_data_update(struct obd_device *obd, int force_sync)
 {
 {
-        struct obd_device_target *obt = &lut->lut_obd->u.obt;
+        struct obd_device_target *obt = &obd->u.obt;
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
         loff_t off = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
-               lut->lut_lsd.lsd_uuid,
-               le64_to_cpu(lut->lut_lsd.lsd_mount_count),
-               le64_to_cpu(lut->lut_lsd.lsd_last_transno));
+               obt->obt_lsd->lsd_uuid,
+               le64_to_cpu(obt->obt_lsd->lsd_mount_count),
+               le64_to_cpu(obt->obt_lsd->lsd_last_transno));
 
 
-        rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
-                                 &lut->lut_lsd, sizeof(lut->lut_lsd),
-                                 &off, force_sync);
+        rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
+                                 sizeof(*obt->obt_lsd), &off, force_sync);
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
 
         if (rc)
                 CERROR("error writing lr_server_data: rc = %d\n", rc);
 
@@ -94,32 +91,32 @@ int obt_server_data_update(struct lu_target *lut, int force_sync)
 void obt_client_epoch_update(struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
 void obt_client_epoch_update(struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
-        struct lu_target *lut = class_exp2tgt(exp);
+        struct obd_device_target *obt = &exp->exp_obd->u.obt;
 
         /** VBR: set client last_epoch to current epoch */
         if (le32_to_cpu(lcd->lcd_last_epoch) >=
 
         /** VBR: set client last_epoch to current epoch */
         if (le32_to_cpu(lcd->lcd_last_epoch) >=
-            le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
+            le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
                 return;
                 return;
-        lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
+        lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
         obt_client_data_update(exp);
 }
 
 /**
  * Increment server epoch. An obd API
  */
         obt_client_data_update(exp);
 }
 
 /**
  * Increment server epoch. An obd API
  */
-static void obt_boot_epoch_update(struct lu_target *lut)
+static void obt_boot_epoch_update(struct obd_device *obd)
 {
 {
-        struct obd_device *obd = lut->lut_obd;
         __u32 start_epoch;
         __u32 start_epoch;
+        struct obd_device_target *obt = &obd->u.obt;
         struct ptlrpc_request *req;
         cfs_list_t client_list;
 
         struct ptlrpc_request *req;
         cfs_list_t client_list;
 
-        cfs_spin_lock(&lut->lut_translock);
-        start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
-        lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
+        cfs_spin_lock(&obt->obt_translock);
+        start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
+        obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
                                             LR_EPOCH_BITS);
                                             LR_EPOCH_BITS);
-        lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
-        cfs_spin_unlock(&lut->lut_translock);
+        obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
+        cfs_spin_unlock(&obt->obt_translock);
 
         CFS_INIT_LIST_HEAD(&client_list);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
 
         CFS_INIT_LIST_HEAD(&client_list);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
@@ -138,7 +135,7 @@ static void obt_boot_epoch_update(struct lu_target *lut)
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-        obt_server_data_update(lut, 1);
+        obt_server_data_update(obd, 1);
 }
 
 /**
 }
 
 /**
@@ -171,33 +168,12 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
 }
 
 /**
 }
 
 /**
- * Free in-memory data for client slot related to export.
- */
-void lut_client_free(struct obd_export *exp)
-{
-        struct lu_export_data *led = &exp->exp_target_data;
-        struct lu_target *lut = class_exp2tgt(exp);
-
-        OBD_FREE_PTR(led->led_lcd);
-        led->led_lcd = NULL;
-        /* Clear bit when lcd is freed */
-        spin_lock(&lut->lut_client_bitmap_lock);
-        if (!test_and_clear_bit(led->led_lr_idx, lut->lut_client_bitmap)) {
-                CERROR("%s: client %u bit already clear in bitmap\n",
-                       exp->exp_obd->obd_name, led->led_lr_idx);
-                LBUG();
-        }
-        spin_unlock(&lut->lut_client_bitmap_lock);
-}
-EXPORT_SYMBOL(lut_client_free);
-
-/**
  * Update client data in last_rcvd
  */
  * Update client data in last_rcvd
  */
-int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
+int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
+                            struct obd_export *exp)
 {
         struct lu_export_data *led = &exp->exp_target_data;
 {
         struct lu_export_data *led = &exp->exp_target_data;
-        struct lu_target *lut = class_exp2tgt(exp);
         struct lsd_client_data tmp_lcd;
         loff_t tmp_off = led->led_lr_off;
         struct lu_buf tmp_buf = {
         struct lsd_client_data tmp_lcd;
         loff_t tmp_off = led->led_lr_off;
         struct lu_buf tmp_buf = {
@@ -243,17 +219,17 @@ static int lut_server_data_update(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
+void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
+                             struct obd_export *exp)
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
 {
         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
-        struct lu_target *lut = class_exp2tgt(exp);
 
         LASSERT(lut->lut_bottom);
         /** VBR: set client last_epoch to current epoch */
         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
                 return;
         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
 
         LASSERT(lut->lut_bottom);
         /** VBR: set client last_epoch to current epoch */
         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
                 return;
         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
-        lut_client_data_update(env, exp);
+        lut_client_data_update(env, lut, exp);
 }
 
 /**
 }
 
 /**
@@ -271,7 +247,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
                 return;
         /** Increase server epoch after recovery */
         if (lut->lut_bottom == NULL)
                 return;
         /** Increase server epoch after recovery */
         if (lut->lut_bottom == NULL)
-                return obt_boot_epoch_update(lut);
+                return obt_boot_epoch_update(lut->lut_obd);
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
@@ -301,7 +277,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
         cfs_list_for_each_entry(req, &client_list, rq_list) {
                 LASSERT(!req->rq_export->exp_delayed);
                 if (!req->rq_export->exp_vbr_failed)
         cfs_list_for_each_entry(req, &client_list, rq_list) {
                 LASSERT(!req->rq_export->exp_delayed);
                 if (!req->rq_export->exp_vbr_failed)
-                        lut_client_epoch_update(&env, req->rq_export);
+                        lut_client_epoch_update(&env, lut, req->rq_export);
         }
         /** return list back at once */
         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
         }
         /** return list back at once */
         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
@@ -356,19 +332,13 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        LASSERT(lut);
-        LASSERT(obd);
         lut->lut_obd = obd;
         lut->lut_bottom = dt;
         lut->lut_last_rcvd = NULL;
         lut->lut_obd = obd;
         lut->lut_bottom = dt;
         lut->lut_last_rcvd = NULL;
-        obd->u.obt.obt_lut = lut;
 
         cfs_spin_lock_init(&lut->lut_translock);
         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
 
         cfs_spin_lock_init(&lut->lut_translock);
         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
-
-        OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
-        if (lut->lut_client_bitmap == NULL)
-                RETURN(-ENOMEM);
+        cfs_spin_lock_init(&lut->lut_trans_table_lock);
 
         /** obdfilter has no lu_device stack yet */
         if (dt == NULL)
 
         /** obdfilter has no lu_device stack yet */
         if (dt == NULL)
@@ -377,8 +347,6 @@ int lut_init(const struct lu_env *env, struct lu_target *lut,
         if (!IS_ERR(o)) {
                 lut->lut_last_rcvd = o;
         } else {
         if (!IS_ERR(o)) {
                 lut->lut_last_rcvd = o;
         } else {
-                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
-                lut->lut_client_bitmap = NULL;
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
         }
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
         }
@@ -390,15 +358,9 @@ EXPORT_SYMBOL(lut_init);
 void lut_fini(const struct lu_env *env, struct lu_target *lut)
 {
         ENTRY;
 void lut_fini(const struct lu_env *env, struct lu_target *lut)
 {
         ENTRY;
-
-        if (lut->lut_client_bitmap) {
-                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
-                lut->lut_client_bitmap = NULL;
-        }
-        if (lut->lut_last_rcvd) {
+        if (lut->lut_last_rcvd)
                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
-                lut->lut_last_rcvd = NULL;
-        }
+        lut->lut_last_rcvd = NULL;
         EXIT;
 }
 EXPORT_SYMBOL(lut_fini);
         EXIT;
 }
 EXPORT_SYMBOL(lut_fini);