Whamcloud - gitweb
b=22273 Don't remove lcd in case of client_add failure
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 94c66d6..2e5806c 100644 (file)
@@ -122,6 +122,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
 {
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(exp->exp_obd);
         struct lsd_client_data *lcd;
         __u64 last_rcvd;
         loff_t off;
@@ -142,22 +143,19 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                 cfs_mutex_up(&fed->fed_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       exp->exp_client_uuid.uuid, rc);
-                err = filter_update_server_data(exp->exp_obd,
-                                                filter->fo_rcvd_filp,
-                                                filter->fo_fsd);
+                err = filter_update_server_data(exp->exp_obd);
                 RETURN(err);
         }
 
         /* we don't allocate new transnos for replayed requests */
         cfs_spin_lock(&filter->fo_translock);
         if (oti->oti_transno == 0) {
-                last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
-                filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1;
+                fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         } else {
                 last_rcvd = oti->oti_transno;
-                if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
-                        filter->fo_fsd->lsd_last_transno =
-                                cpu_to_le64(last_rcvd);
+                if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno))
+                        fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         }
         oti->oti_transno = last_rcvd;
 
@@ -267,7 +265,7 @@ static int filter_export_stats_init(struct obd_device *obd,
 
                 OBD_ALLOC(tmp->nid_brw_stats, sizeof(struct brw_stats));
                 if (tmp->nid_brw_stats == NULL)
-                        RETURN(-ENOMEM);
+                        GOTO(clean, rc = -ENOMEM);
 
                 init_brw_stats(tmp->nid_brw_stats);
                 rc = lprocfs_seq_create(exp->exp_nid_stats->nid_proc, "brw_stats",
@@ -278,27 +276,30 @@ static int filter_export_stats_init(struct obd_device *obd,
 
                 rc = lprocfs_init_rw_stats(obd, &exp->exp_nid_stats->nid_stats);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(clean, rc);
 
                 rc = lprocfs_register_stats(tmp->nid_proc, "stats",
                                             tmp->nid_stats);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(clean, rc);
                 /* Always add in ldlm_stats */
                 tmp->nid_ldlm_stats = 
                         lprocfs_alloc_stats(LDLM_LAST_OPC - LDLM_FIRST_OPC,
                                             LPROCFS_STATS_FLAG_NOPERCPU);
                 if (tmp->nid_ldlm_stats == NULL)
-                        return -ENOMEM;
+                        GOTO(clean, rc = -ENOMEM);
 
                 lprocfs_init_ldlm_stats(tmp->nid_ldlm_stats);
                 rc = lprocfs_register_stats(tmp->nid_proc, "ldlm_stats",
                                             tmp->nid_ldlm_stats);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(clean, rc);
         }
 
         RETURN(0);
+ clean:
+        lprocfs_exp_cleanup(exp);
+        return rc;
 }
 
 /* Add client data to the FILTER.  We use a bitmap to locate a free space
@@ -310,6 +311,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
 {
         struct filter_obd *filter = &obd->u.filter;
         struct filter_export_data *fed = &exp->exp_filter_data;
+        struct lr_server_data *fsd = class_server_data(obd);
         unsigned long *bitmap = filter->fo_last_rcvd_slots;
         int new_client = (cl_idx == -1);
 
@@ -347,8 +349,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         }
 
         fed->fed_lr_idx = cl_idx;
-        fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) +
-                cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size);
+        fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) +
+                          cl_idx * le16_to_cpu(fsd->lsd_client_size);
         cfs_init_mutex(&fed->fed_lcd_lock);
         LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
@@ -364,6 +366,9 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                 CDEBUG(D_INFO, "writing client lcd at idx %u (%llu) (len %u)\n",
                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_lcd));
 
+                if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
+                        RETURN(-ENOSPC);
+
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 /* Transaction needed to fix bug 1403 */
                 handle = fsfilt_start(obd,
@@ -373,8 +378,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
                         rc = PTR_ERR(handle);
                         CERROR("unable to start transaction: rc %d\n", rc);
                 } else {
-                        fed->fed_lcd->lcd_last_epoch =
-                                              filter->fo_fsd->lsd_start_epoch;
+                        fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch;
                         exp->exp_last_request_time = cfs_time_current_sec();
                         rc = fsfilt_add_journal_cb(obd, 0, handle,
                                                    target_client_add_cb,
@@ -403,9 +407,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         RETURN(0);
 }
 
-struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
-
-static int filter_client_free(struct obd_export *exp)
+static int filter_client_del(struct obd_export *exp)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct filter_obd *filter = &exp->exp_obd->u.filter;
@@ -450,12 +452,13 @@ static int filter_client_free(struct obd_export *exp)
         /* Make sure the server's last_transno is up to date.
          * This should be done before zeroing client slot so last_transno will
          * be in server data or in client data in case of failure */
-        filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd);
+        filter_update_server_data(obd);
 
         cfs_mutex_down(&fed->fed_lcd_lock);
-        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd,
-                                 sizeof(zero_lcd), &off, 0);
-        fed->fed_lcd = NULL;
+        memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid);
+        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp,
+                                 fed->fed_lcd,
+                                 sizeof(*fed->fed_lcd), &off, 0);
         cfs_mutex_up(&fed->fed_lcd_lock);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
@@ -463,21 +466,8 @@ static int filter_client_free(struct obd_export *exp)
                "zero out client %s at idx %u/%llu in %s, rc %d\n",
                lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
                LAST_RCVD, rc);
-
-        if (!cfs_test_and_clear_bit(fed->fed_lr_idx,
-                                    filter->fo_last_rcvd_slots)) {
-                CERROR("FILTER client %u: bit already clear in bitmap!!\n",
-                       fed->fed_lr_idx);
-                LBUG();
-        }
-        OBD_FREE_PTR(lcd);
-        RETURN(0);
+        EXIT;
 free:
-        cfs_mutex_down(&fed->fed_lcd_lock);
-        fed->fed_lcd = NULL;
-        cfs_mutex_up(&fed->fed_lcd_lock);
-        OBD_FREE_PTR(lcd);
-
         return 0;
 }
 
@@ -667,17 +657,16 @@ static int filter_init_export(struct obd_export *exp)
 
 static int filter_free_server_data(struct filter_obd *filter)
 {
-        OBD_FREE_PTR(filter->fo_fsd);
-        filter->fo_fsd = NULL;
-        OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        filter->fo_last_rcvd_slots = NULL;
+        lut_fini(NULL, filter->fo_obt.obt_lut);
+        OBD_FREE_PTR(filter->fo_obt.obt_lut);
         return 0;
 }
 
 /* assumes caller is already in kernel ctxt */
-int filter_update_server_data(struct obd_device *obd, struct file *filp,
-                              struct lr_server_data *fsd)
+int filter_update_server_data(struct obd_device *obd)
 {
+        struct file *filp = obd->u.obt.obt_rcvd_filp;
+        struct lr_server_data *fsd = class_server_data(obd);
         loff_t off = 0;
         int rc;
         ENTRY;
@@ -730,6 +719,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         struct lsd_client_data *lcd = NULL;
         struct inode *inode = filp->f_dentry->d_inode;
         unsigned long last_rcvd_size = i_size_read(inode);
+        struct lu_target *lut;
         __u64 mount_count;
         __u32 start_epoch;
         int cl_idx;
@@ -742,17 +732,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         CLASSERT (offsetof(struct lsd_client_data, lcd_padding) +
                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
-        OBD_ALLOC(fsd, sizeof(*fsd));
-        if (!fsd)
-                RETURN(-ENOMEM);
-        filter->fo_fsd = fsd;
-
-        OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
-        if (filter->fo_last_rcvd_slots == NULL) {
-                OBD_FREE(fsd, sizeof(*fsd));
+        /* allocate and initialize lu_target */
+        OBD_ALLOC_PTR(lut);
+        if (lut == NULL)
                 RETURN(-ENOMEM);
-        }
-
+        rc = lut_init(NULL, lut, obd, NULL);
+        if (rc)
+                GOTO(err_fsd, rc);
+        fsd = class_server_data(obd);
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
 
@@ -926,7 +913,7 @@ out:
         fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count);
 
         /* save it, so mount count and last_transno is current */
-        rc = filter_update_server_data(obd, filp, filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 GOTO(err_client, rc);
 
@@ -1314,16 +1301,13 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_filp, rc = -EOPNOTSUPP);
         }
 
-        /** lu_target has very limited use in filter now */
-        lut_init(NULL, &filter->fo_lut, obd, NULL);
-
         rc = filter_init_server_data(obd, file);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
                 GOTO(err_filp, rc);
         }
-
-        target_recovery_init(&filter->fo_lut, ost_handle);
+        LASSERT(obd->u.obt.obt_lut);
+        target_recovery_init(obd->u.obt.obt_lut, ost_handle);
 
         /* open and create health check io file*/
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
@@ -1377,8 +1361,7 @@ static void filter_post(struct obd_device *obd)
          * from lastobjid */
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                       filter->fo_fsd);
+        rc = filter_update_server_data(obd);
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
@@ -1411,7 +1394,6 @@ static void filter_post(struct obd_device *obd)
 static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
 
         cfs_spin_lock(&filter->fo_objidlock);
@@ -1422,8 +1404,8 @@ static void filter_set_last_id(struct filter_obd *filter,
 obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
-        LASSERT(filter->fo_fsd != NULL);
         LASSERT(group <= filter->fo_group_count);
+        LASSERT(filter->fo_last_objids != NULL);
 
         /* FIXME: object groups */
         cfs_spin_lock(&filter->fo_objidlock);
@@ -2064,7 +2046,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         for (i = 0; i < 32; i++)
                 cfs_sema_init(&filter->fo_create_locks[i], 1);
 
-        cfs_spin_lock_init(&filter->fo_translock);
         cfs_spin_lock_init(&filter->fo_objidlock);
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         cfs_sema_init(&filter->fo_alloc_lock, 1);
@@ -2134,24 +2115,21 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         }
 
         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
-
-        if (obd->obd_recovering) {
-                LCONSOLE_WARN("OST %s now serving %s (%s%s%s), but will be in "
-                              "recovery for at least %d:%.02d, or until %d "
-                              "client%s reconnect%s.\n",
-                              obd->obd_name, lustre_cfg_string(lcfg, 1),
-                              label ?: "", label ? "/" : "", str,
+        LCONSOLE_INFO("%s: Now serving %s %s%s with recovery %s\n",
+                      obd->obd_name, label ?: str, lmi ? "on " : "",
+                      lmi ? s2lsi(lmi->lmi_sb)->lsi_lmd->lmd_dev : "",
+                      obd->obd_replayable ? "enabled" : "disabled");
+
+        if (obd->obd_recovering)
+                LCONSOLE_WARN("%s: Will be in recovery for at least %d:%.02d, "
+                              "or until %d client%s reconnect%s\n",
+                              obd->obd_name,
                               obd->obd_recovery_timeout / 60,
                               obd->obd_recovery_timeout % 60,
                               obd->obd_max_recoverable_clients,
-                              (obd->obd_max_recoverable_clients == 1) ? "":"s",
-                              (obd->obd_max_recoverable_clients == 1) ? "s":"");
-        } else {
-                LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
-                              "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
-                              label ?: "", label ? "/" : "", str,
-                              obd->obd_replayable ? "enabled" : "disabled");
-        }
+                              (obd->obd_max_recoverable_clients == 1) ? "" : "s",
+                              (obd->obd_max_recoverable_clients == 1) ? "s": "");
+
 
         RETURN(0);
 
@@ -2703,8 +2681,7 @@ static int filter_connect_internal(struct obd_export *exp,
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
-                struct filter_obd *filter = &exp->exp_obd->u.filter;
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(exp->exp_obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (!(lsd->lsd_feature_compat &
@@ -2714,8 +2691,7 @@ static int filter_connect_internal(struct obd_export *exp,
                         lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST);
                         /* sync is not needed here as filter_client_add will
                          * set exp_need_sync flag */
-                        filter_update_server_data(exp->exp_obd,
-                                                  filter->fo_rcvd_filp, lsd);
+                        filter_update_server_data(exp->exp_obd);
                 } else if (index != data->ocd_index) {
                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
                                            " %u doesn't match actual OST index"
@@ -2847,11 +2823,8 @@ static int filter_connect(const struct lu_env *env,
 
 cleanup:
         if (rc) {
-                if (lcd) {
-                        OBD_FREE_PTR(lcd);
-                        fed->fed_lcd = NULL;
-                }
                 class_disconnect(lexp);
+                lprocfs_exp_cleanup(lexp);
                 *exp = NULL;
         } else {
                 *exp = lexp;
@@ -2970,12 +2943,13 @@ static void filter_grant_discard(struct obd_export *exp)
 
 static int filter_destroy_export(struct obd_export *exp)
 {
+        struct filter_export_data *fed = &exp->exp_filter_data;
         ENTRY;
 
-        if (exp->exp_filter_data.fed_pending)
+        if (fed->fed_pending)
                 CERROR("%s: cli %s/%p has %lu pending on destroyed export\n",
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
-                       exp, exp->exp_filter_data.fed_pending);
+                       exp, fed->fed_pending);
 
         lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
 
@@ -2985,6 +2959,7 @@ static int filter_destroy_export(struct obd_export *exp)
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
+        lut_client_free(exp);
 
         if (!exp->exp_obd->obd_replayable)
                 fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb);
@@ -3080,7 +3055,7 @@ static int filter_disconnect(struct obd_export *exp)
         rc = server_disconnect_export(exp);
 
         if (exp->exp_obd->obd_replayable)
-                filter_client_free(exp);
+                filter_client_del(exp);
         else
                 fsfilt_sync(obd, obd->u.obt.obt_sb);
 
@@ -3432,12 +3407,21 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
         filter = &exp->exp_obd->u.filter;
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
+        /*
+         * We need to be atomic against a concurrent write
+         * (which takes the semaphore for reading). fmd_mactime_xid
+         * checks will have no effect if a write request with lower
+         * xid starts just before a setattr and finishes later than
+         * the setattr (see bug 21489, comment 27).
+         */
         if (oa->o_valid &
             (OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME)) {
+                down_write(&dentry->d_inode->i_alloc_sem);
                 fmd = filter_fmd_get(exp, oa->o_id, oa->o_gr);
                 if (fmd && fmd->fmd_mactime_xid < oti->oti_xid)
                         fmd->fmd_mactime_xid = oti->oti_xid;
                 filter_fmd_put(exp, fmd);
+                up_write(&dentry->d_inode->i_alloc_sem);
         }
 
         /* setting objects attributes (including owner/group) */
@@ -3703,7 +3687,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                                  osfs->os_bsize - 1) >> blockbits));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) {
-                struct lr_server_data *lsd = filter->fo_fsd;
+                struct lr_server_data *lsd = class_server_data(obd);
                 int index = le32_to_cpu(lsd->lsd_ost_index);
 
                 if (obd_fail_val == -1 ||