From bf111a4513db2713d5c43460feb5d520a27c00d6 Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Thu, 20 May 2010 19:03:05 +0400 Subject: [PATCH] b=21485 allocate lcd inside obd_init_export() i=rread i=zam --- lustre/include/lu_target.h | 1 + lustre/mdt/mdt_handler.c | 39 ++++++++++--------- lustre/mdt/mdt_recovery.c | 57 +++++++++++++--------------- lustre/obdfilter/filter.c | 94 +++++++++++++++++++--------------------------- lustre/ptlrpc/target.c | 18 +++++++++ 5 files changed, 104 insertions(+), 105 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 09c98a6..b13a05f 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -74,5 +74,6 @@ void lut_cb_client(struct lu_target *, __u64, void *, int); int lut_init(const struct lu_env *, struct lu_target *, struct obd_device *, struct dt_device *); void lut_fini(const struct lu_env *, struct lu_target *); +int lut_client_alloc(struct obd_export *); void lut_client_free(struct obd_export *); #endif /* __LUSTRE_LU_TARGET_H */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 0b6b12f..a064ca4 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4979,7 +4979,6 @@ static int mdt_obd_connect(const struct lu_env *env, void *localdata) { struct mdt_thread_info *info; - struct lsd_client_data *lcd; struct obd_export *lexp; struct lustre_handle conn = { 0 }; struct mdt_device *mdt; @@ -5008,28 +5007,25 @@ static int mdt_obd_connect(const struct lu_env *env, rc = mdt_connect_internal(lexp, mdt, data); if (rc == 0) { - OBD_ALLOC_PTR(lcd); - if (lcd != NULL) { - struct mdt_thread_info *mti; - mti = lu_context_key_get(&env->le_ctx, - &mdt_thread_key); - LASSERT(mti != NULL); - mti->mti_exp = lexp; - memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); - lexp->exp_target_data.ted_lcd = lcd; - rc = mdt_client_new(env, mdt); - if (rc == 0) - mdt_export_stats_init(obd, lexp, localdata); - } else { - rc = -ENOMEM; - } + struct mdt_thread_info *mti; + struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; + LASSERT(lcd); + mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + LASSERT(mti != NULL); + mti->mti_exp = lexp; + memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); + rc = mdt_client_new(env, mdt); + if (rc == 0) + mdt_export_stats_init(obd, lexp, localdata); } out: - if (rc != 0) + if (rc != 0) { class_disconnect(lexp); - else + *exp = NULL; + } else { *exp = lexp; + } RETURN(rc); } @@ -5176,7 +5172,10 @@ static int mdt_init_export(struct obd_export *exp) cfs_spin_lock(&exp->exp_lock); exp->exp_connecting = 1; cfs_spin_unlock(&exp->exp_lock); - rc = ldlm_init_export(exp); + rc = lut_client_alloc(exp); + if (rc == 0) + rc = ldlm_init_export(exp); + if (rc) CERROR("Error %d while initializing export\n", rc); RETURN(rc); @@ -5194,13 +5193,13 @@ static int mdt_destroy_export(struct obd_export *exp) target_destroy_export(exp); ldlm_destroy_export(exp); + lut_client_free(exp); LASSERT(cfs_list_empty(&exp->exp_outstanding_replies)); LASSERT(cfs_list_empty(&exp->exp_mdt_data.med_open_head)); if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); - lut_client_free(exp); RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index b92727b..6279111 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -241,13 +241,17 @@ static int mdt_clients_data_init(const struct lu_env *env, unsigned long last_size) { struct lr_server_data *lsd = &mdt->mdt_lut.lut_lsd; - struct lsd_client_data *lcd = NULL; + struct lsd_client_data *lcd; struct obd_device *obd = mdt2obd_dev(mdt); loff_t off; int cl_idx; int rc = 0; ENTRY; + OBD_ALLOC_PTR(lcd); + if (!lcd) + RETURN(-ENOMEM); + /* When we do a clean MDS shutdown, we save the last_transno into * the header. If we find clients with higher last_transno values * then those clients may need recovery done. */ @@ -256,12 +260,7 @@ static int mdt_clients_data_init(const struct lu_env *env, off < last_size; cl_idx++) { __u64 last_transno; struct obd_export *exp; - - if (!lcd) { - OBD_ALLOC_PTR(lcd); - if (!lcd) - RETURN(-ENOMEM); - } + struct mdt_thread_info *mti; off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size; @@ -295,30 +294,29 @@ static int mdt_clients_data_init(const struct lu_env *env, if (IS_ERR(exp)) { if (PTR_ERR(exp) == -EALREADY) { /* export already exists, zero out this one */ - lcd->lcd_uuid[0] = '\0'; - } else { - GOTO(err_client, rc = PTR_ERR(exp)); + CERROR("Duplicate export %s!\n", lcd->lcd_uuid); + continue; } - } else { - struct mdt_thread_info *mti; - mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - LASSERT(mti != NULL); - mti->mti_exp = exp; - exp->exp_target_data.ted_lcd = lcd; - rc = mdt_client_add(env, mdt, cl_idx); - /* can't fail existing */ - LASSERTF(rc == 0, "rc = %d\n", rc); - /* VBR: set export last committed version */ - exp->exp_last_committed = last_transno; - lcd = NULL; - cfs_spin_lock(&exp->exp_lock); - exp->exp_connecting = 0; - exp->exp_in_recovery = 0; - cfs_spin_unlock(&exp->exp_lock); - obd->obd_max_recoverable_clients++; - class_export_put(exp); + GOTO(err_client, rc = PTR_ERR(exp)); } + mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + LASSERT(mti != NULL); + mti->mti_exp = exp; + /* copy on-disk lcd to the export */ + *exp->exp_target_data.ted_lcd = *lcd; + rc = mdt_client_add(env, mdt, cl_idx); + /* can't fail existing */ + LASSERTF(rc == 0, "rc = %d\n", rc); + /* VBR: set export last committed version */ + exp->exp_last_committed = last_transno; + cfs_spin_lock(&exp->exp_lock); + exp->exp_connecting = 0; + exp->exp_in_recovery = 0; + cfs_spin_unlock(&exp->exp_lock); + obd->obd_max_recoverable_clients++; + class_export_put(exp); + CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n", cl_idx, last_transno); /* protect __u64 value update */ @@ -329,8 +327,7 @@ static int mdt_clients_data_init(const struct lu_env *env, } err_client: - if (lcd) - OBD_FREE_PTR(lcd); + OBD_FREE_PTR(lcd); RETURN(rc); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index f59732d..1b9078a 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -643,14 +643,18 @@ static void filter_fmd_cleanup(struct obd_export *exp) static int filter_init_export(struct obd_export *exp) { + int rc; cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); cfs_spin_lock(&exp->exp_lock); exp->exp_connecting = 1; cfs_spin_unlock(&exp->exp_lock); + rc = lut_client_alloc(exp); + if (rc == 0) + rc = ldlm_init_export(exp); - return ldlm_init_export(exp); + return rc; } static int filter_free_server_data(struct obd_device_target *obt) @@ -821,18 +825,16 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) GOTO(out, rc = 0); } + OBD_ALLOC_PTR(lcd); + if (!lcd) + GOTO(err_client, rc = -ENOMEM); + for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start); off < last_rcvd_size; cl_idx++) { __u64 last_rcvd; struct obd_export *exp; struct filter_export_data *fed; - if (!lcd) { - OBD_ALLOC_PTR(lcd); - if (!lcd) - GOTO(err_client, rc = -ENOMEM); - } - /* Don't assume off is incremented properly by * fsfilt_read_record(), in case sizeof(*lcd) * isn't the same as lsd->lsd_client_size. */ @@ -855,55 +857,47 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) last_rcvd = le64_to_cpu(lcd->lcd_last_transno); + CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx, + last_rcvd, le64_to_cpu(lsd->lsd_last_transno)); + /* These exports are cleaned up by filter_disconnect(), so they * need to be set up like real exports as filter_connect() does. */ exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid); - - CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx, - last_rcvd, le64_to_cpu(lsd->lsd_last_transno)); if (IS_ERR(exp)) { if (PTR_ERR(exp) == -EALREADY) { /* export already exists, zero out this one */ - CERROR("Zeroing out duplicate export due to " - "bug 10479.\n"); - lcd->lcd_uuid[0] = '\0'; - } else { - GOTO(err_client, rc = PTR_ERR(exp)); + CERROR("Duplicate export %s!\n", lcd->lcd_uuid); + continue; } - } else { - fed = &exp->exp_filter_data; - fed->fed_ted.ted_lcd = lcd; - fed->fed_group = 0; /* will be assigned at connect */ - filter_export_stats_init(obd, exp, NULL); - rc = filter_client_add(obd, exp, cl_idx); - /* can't fail for existing client */ - LASSERTF(rc == 0, "rc = %d\n", rc); - - /* VBR: set export last committed */ - exp->exp_last_committed = last_rcvd; - cfs_spin_lock(&exp->exp_lock); - exp->exp_connecting = 0; - exp->exp_in_recovery = 0; - cfs_spin_unlock(&exp->exp_lock); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_max_recoverable_clients++; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - lcd = NULL; - class_export_put(exp); + OBD_FREE_PTR(lcd); + GOTO(err_client, rc = PTR_ERR(exp)); } - /* Need to check last_rcvd even for duplicated exports. */ - CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", - cl_idx, last_rcvd); + fed = &exp->exp_filter_data; + *fed->fed_ted.ted_lcd = *lcd; + fed->fed_group = 0; /* will be assigned at connect */ + filter_export_stats_init(obd, exp, NULL); + rc = filter_client_add(obd, exp, cl_idx); + /* can't fail for existing client */ + LASSERTF(rc == 0, "rc = %d\n", rc); + + /* VBR: set export last committed */ + exp->exp_last_committed = last_rcvd; + cfs_spin_lock(&exp->exp_lock); + exp->exp_connecting = 0; + exp->exp_in_recovery = 0; + cfs_spin_unlock(&exp->exp_lock); + cfs_spin_lock_bh(&obd->obd_processing_task_lock); + obd->obd_max_recoverable_clients++; + cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + class_export_put(exp); if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno)) lsd->lsd_last_transno = cpu_to_le64(last_rcvd); } - - if (lcd) - OBD_FREE_PTR(lcd); + OBD_FREE_PTR(lcd); obd->obd_last_committed = le64_to_cpu(lsd->lsd_last_transno); out: @@ -2775,8 +2769,6 @@ static int filter_connect(const struct lu_env *env, struct lvfs_run_ctxt saved; struct lustre_handle conn = { 0 }; struct obd_export *lexp; - struct tg_export_data *ted; - struct lsd_client_data *lcd = NULL; __u32 group; int rc; ENTRY; @@ -2790,22 +2782,15 @@ static int filter_connect(const struct lu_env *env, lexp = class_conn2export(&conn); LASSERT(lexp != NULL); - ted = &lexp->exp_target_data; - rc = filter_connect_internal(lexp, data, 0); if (rc) GOTO(cleanup, rc); filter_export_stats_init(obd, lexp, localdata); if (obd->obd_replayable) { - OBD_ALLOC(lcd, sizeof(*lcd)); - if (!lcd) { - CERROR("filter: out of memory for client data\n"); - GOTO(cleanup, rc = -ENOMEM); - } - + struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; + LASSERT(lcd); memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid)); - ted->ted_lcd = lcd; rc = filter_client_add(obd, lexp, -1); if (rc) GOTO(cleanup, rc); @@ -2960,12 +2945,11 @@ static int filter_destroy_export(struct obd_export *exp) target_destroy_export(exp); ldlm_destroy_export(exp); + lut_client_free(exp); if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); - lut_client_free(exp); - if (!exp->exp_obd->obd_replayable) fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb); diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 94c6e3b..8b9f581 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -171,6 +171,20 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, } /** + * Allocate in-memory data for client slot related to export. + */ +int lut_client_alloc(struct obd_export *exp) +{ + OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd); + if (exp->exp_target_data.ted_lcd == NULL) + RETURN(-ENOMEM); + /* Mark that slot is not yet valid, 0 doesn't work here */ + exp->exp_target_data.ted_lr_idx = -1; + RETURN(0); +} +EXPORT_SYMBOL(lut_client_alloc); + +/** * Free in-memory data for client slot related to export. */ void lut_client_free(struct obd_export *exp) @@ -180,6 +194,10 @@ void lut_client_free(struct obd_export *exp) OBD_FREE_PTR(ted->ted_lcd); ted->ted_lcd = NULL; + + /* Slot may be not yet assigned */ + if (ted->ted_lr_idx < 0) + return; /* Clear bit when lcd is freed */ spin_lock(&lut->lut_client_bitmap_lock); if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) { -- 1.8.3.1