Whamcloud - gitweb
b=21485 Keep in-memory client data until export destroy
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 4aed2ae..8043556 100644 (file)
@@ -122,6 +122,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(exp->exp_obd);
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
@@ -142,22 +143,19 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
-                err = filter_update_server_data(exp->exp_obd,
-                                                filter->fo_rcvd_filp,
-                                                filter->fo_fsd);
+                err = filter_update_server_data(exp->exp_obd);
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
-                filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1;
+                fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         } else {
                 last_rcvd = oti->oti_transno;
-                if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
-                        filter->fo_fsd->lsd_last_transno =
-                                cpu_to_le64(last_rcvd);
+                if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
+                        fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         }
         oti->oti_transno = last_rcvd;
 
@@ -313,6 +311,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(obd);
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
@@ -350,8 +349,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         }
 
         fed->fed_lr_idx = cl_idx;
-        fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
-                cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
+        fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) +
+                          cl_idx * le16_to_cpu(fsd->lsd_client_size);
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
@@ -376,8 +375,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
-                        fed->fed_lcd->lcd_last_epoch =
-                                              filter->fo_fsd->lsd_start_epoch;
+                        fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch;
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
@@ -406,9 +404,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         RETURN(0);
 }
 
-struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
-
-static int filter_client_free(struct obd_export *exp)
+static int filter_client_del(struct obd_export *exp)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
@@ -453,12 +449,13 @@ static int filter_client_free(struct obd_export *exp)
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
-        filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd);
+        filter_update_server_data(obd);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
-        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd,
-                                 sizeof(zero_lcd), &off, 0);
-        fed->fed_lcd = NULL;
+        memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid);
+        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+                                 fed->fed_lcd,
+                                 sizeof(*fed->fed_lcd), &off, 0);
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
@@ -466,21 +463,8 @@ static int filter_client_free(struct obd_export *exp)
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
-
-        if (!cfs_test_and_clear_bit(fed->fed_lr_idx,
-                                    filter->fo_last_rcvd_slots)) {
-                CERROR("FILTER client %u: bit already clear in bitmap!!\n",
-                       fed->fed_lr_idx);
-                LBUG();
-        }
-        OBD_FREE_PTR(lcd);
-        RETURN(0);
+        EXIT;
 free:
-        cfs_mutex_down(&fed->fed_lcd_lock);
-        fed->fed_lcd = NULL;
-        cfs_mutex_up(&fed->fed_lcd_lock);
-        OBD_FREE_PTR(lcd);
-
         return 0;
 }
 
@@ -670,17 +654,16 @@ static int filter_init_export(struct obd_export *exp)
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
-        OBD_FREE_PTR(filter->fo_fsd);
-        filter->fo_fsd = NULL;
-        OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        filter->fo_last_rcvd_slots = NULL;
+        lut_fini(NULL, filter->fo_obt.obt_lut);
+        OBD_FREE_PTR(filter->fo_obt.obt_lut);
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd, struct file *filp,
-                              struct lr_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd)
 {
+        struct file *filp = obd->u.obt.obt_rcvd_filp;
+        struct lr_server_data *fsd = class_server_data(obd);
         loff_t off = 0;
         int rc;
         ENTRY;
@@ -733,6 +716,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
+        struct lu_target *lut;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
@@ -745,17 +729,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
-        OBD_ALLOC(fsd, sizeof(*fsd));
-        if (!fsd)
-                RETURN(-ENOMEM);
-        filter->fo_fsd = fsd;
-
-        OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        if (filter->fo_last_rcvd_slots == NULL) {
-                OBD_FREE(fsd, sizeof(*fsd));
+        /* allocate and initialize lu_target */
+        OBD_ALLOC_PTR(lut);
+        if (lut == NULL)
                 RETURN(-ENOMEM);
-        }
-
+        rc = lut_init(NULL, lut, obd, NULL);
+        if (rc)
+                GOTO(err_fsd, rc);
+        fsd = class_server_data(obd);
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
@@ -929,7 +910,7 @@ out:
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
-        rc = filter_update_server_data(obd, filp, filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 GOTO(err_client, rc);
 
@@ -1317,16 +1298,13 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
-        /** lu_target has very limited use in filter now */
-        lut_init(NULL, &filter->fo_lut, obd, NULL);
-
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
-
-        target_recovery_init(&filter->fo_lut, ost_handle);
+        LASSERT(obd->u.obt.obt_lut);
+        target_recovery_init(obd->u.obt.obt_lut, ost_handle);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
@@ -1380,8 +1358,7 @@ static void filter_post(struct obd_device *obd)
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
@@ -1414,7 +1391,6 @@ static void filter_post(struct obd_device *obd)
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
@@ -1425,7 +1401,6 @@ static void filter_set_last_id(struct filter_obd *filter,
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
         LASSERT(filter->fo_last_objids != NULL);
 
@@ -2068,7 +2043,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
-        cfs_spin_lock_init(&filter->fo_translock);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
@@ -2704,8 +2678,7 @@ static int filter_connect_internal(struct obd_export *exp,
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
-                struct filter_obd *filter = &exp->exp_obd->u.filter;
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(exp->exp_obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
@@ -2715,8 +2688,7 @@ static int filter_connect_internal(struct obd_export *exp,
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
-                        filter_update_server_data(exp->exp_obd,
-                                                  filter->fo_rcvd_filp, lsd);
+                        filter_update_server_data(exp->exp_obd);
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
@@ -2972,12 +2944,13 @@ static void filter_grant_discard(struct obd_export *exp)
 
 static int filter_destroy_export(struct obd_export *exp)
 {
+        struct filter_export_data *fed = &exp->exp_filter_data;
         ENTRY;
 
-        if (exp->exp_filter_data.fed_pending)
+        if (fed->fed_pending)
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
-                       exp, exp->exp_filter_data.fed_pending);
+                       exp, fed->fed_pending);
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
@@ -2987,6 +2960,7 @@ static int filter_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
+        lut_client_free(exp);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
@@ -3082,7 +3056,7 @@ static int filter_disconnect(struct obd_export *exp)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
-                filter_client_free(exp);
+                filter_client_del(exp);
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
@@ -3714,7 +3688,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||