From 5ae89cf509f30f00fb6c2bfb58c85b66a16a9f54 Mon Sep 17 00:00:00 2001 From: Mike Tappro Date: Tue, 2 Mar 2010 17:32:45 -0800 Subject: [PATCH] b=21485 Keep in-memory client data until export destroy Client data on disk is removed upon disconnect but in-memory data is needed while export is still used and must be kept until its destroy. i=adilger i=rread --- lustre/include/lu_target.h | 6 +- lustre/include/lustre_disk.h | 2 - lustre/include/obd.h | 20 ++----- lustre/include/obd_class.h | 13 ++++- lustre/mdt/mdt_handler.c | 5 +- lustre/mdt/mdt_recovery.c | 12 +--- lustre/obdfilter/filter.c | 110 ++++++++++++++----------------------- lustre/obdfilter/filter_internal.h | 3 +- lustre/ptlrpc/target.c | 96 ++++++++++++++++++++++---------- 9 files changed, 130 insertions(+), 137 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 811f47e..09c98a6 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -56,11 +56,9 @@ struct lu_target { /** Lock protecting client bitmap */ cfs_spinlock_t lut_client_bitmap_lock; /** Bitmap of known clients */ - unsigned long lut_client_bitmap[LR_CLIENT_BITMAP_SIZE]; + unsigned long *lut_client_bitmap; /** Number of mounts */ __u64 lut_mount_count; - __u32 lut_stale_export_age; - cfs_spinlock_t lut_trans_table_lock; }; typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno, @@ -76,5 +74,5 @@ void lut_cb_client(struct lu_target *, __u64, void *, int); int lut_init(const struct lu_env *, struct lu_target *, struct obd_device *, struct dt_device *); void lut_fini(const struct lu_env *, struct lu_target *); - +void lut_client_free(struct obd_export *); #endif /* __LUSTRE_LU_TARGET_H */ diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index 8141fce..0a456db6 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -217,8 +217,6 @@ struct lustre_mount_data { #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8) #endif -#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long)) - /** COMPAT_146: this is an OST (temporary) */ #define OBD_COMPAT_OST 0x00000002 /** COMPAT_146: this is an MDT (temporary) */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 454f383..bedb530 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -232,17 +232,7 @@ struct obd_device_target { struct super_block *obt_sb; /** last_rcvd file */ struct file *obt_rcvd_filp; - /** server data in last_rcvd file */ - struct lr_server_data *obt_lsd; - /** Lock protecting client bitmap */ - cfs_spinlock_t obt_client_bitmap_lock; - /** Bitmap of known clients */ - unsigned long *obt_client_bitmap; - /** Server last transaction number */ - __u64 obt_last_transno; - /** Lock protecting last transaction number */ - cfs_spinlock_t obt_translock; - /** Number of mounts */ + struct lu_target *obt_lut; __u64 obt_mount_count; cfs_semaphore_t obt_quotachecking; struct lustre_quota_ctxt obt_qctxt; @@ -287,7 +277,6 @@ struct filter_ext { struct filter_obd { /* NB this field MUST be first */ struct obd_device_target fo_obt; - struct lu_target fo_lut; const char *fo_fstype; int fo_group_count; @@ -368,11 +357,10 @@ struct filter_obd { int fo_sec_level; }; -#define fo_translock fo_obt.obt_translock +#define fo_translock fo_obt.obt_lut->lut_translock +#define fo_last_rcvd_slots fo_obt.obt_lut->lut_client_bitmap +#define fo_mount_count fo_obt.obt_lut->lut_mount_count #define fo_rcvd_filp fo_obt.obt_rcvd_filp -#define fo_fsd fo_obt.obt_lsd -#define fo_last_rcvd_slots fo_obt.obt_client_bitmap -#define fo_mount_count fo_obt.obt_mount_count #define fo_vfsmnt fo_obt.obt_vfsmnt struct timeout_item { diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 550e0a8..0a16ece 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -103,7 +103,6 @@ void obd_zombie_impexp_stop(void); void obd_zombie_impexp_cull(void); void obd_zombie_barrier(void); void obd_exports_barrier(struct obd_device *obd); - /* obd_config.c */ int class_process_config(struct lustre_cfg *lcfg); int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars, @@ -272,6 +271,18 @@ static inline enum obd_option exp_flags_from_obd(struct obd_device *obd) 0); } +static inline struct lu_target *class_exp2tgt(struct obd_export *exp) +{ + LASSERT(exp->exp_obd); + return exp->exp_obd->u.obt.obt_lut; +} + +static inline struct lr_server_data *class_server_data(struct obd_device *obd) +{ + LASSERT(obd->u.obt.obt_lut); + return &obd->u.obt.obt_lut->lut_lsd; +} + void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid); void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj); void obdo_from_iattr(struct obdo *oa, struct iattr *attr, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index ed96c2b..f8a6f9b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4498,8 +4498,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - cfs_spin_lock_init(&m->mdt_transno_lock); - m->mdt_max_mdsize = MAX_MD_SIZE; m->mdt_max_cookiesize = sizeof(struct llog_cookie); m->mdt_som_conf = 0; @@ -4539,8 +4537,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_nosquash_strlen = 0; cfs_init_rwsem(&m->mdt_squash_sem); - cfs_spin_lock_init(&m->mdt_client_bitmap_lock); - OBD_ALLOC_PTR(mite); if (mite == NULL) GOTO(err_lmi, rc = -ENOMEM); @@ -5211,6 +5207,7 @@ static int mdt_destroy_export(struct obd_export *exp) if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); + lut_client_free(exp); RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 9c1a885..f41651a 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -703,25 +703,15 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) GOTO(free, rc = PTR_ERR(th)); cfs_mutex_down(&med->med_lcd_lock); - memset(lcd, 0, sizeof *lcd); + memset(lcd->lcd_uuid, 0, sizeof lcd->lcd_uuid); rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th); - med->med_lcd = NULL; cfs_mutex_up(&med->med_lcd_lock); mdt_trans_stop(env, mdt, th); - cfs_spin_lock(&mdt->mdt_client_bitmap_lock); - cfs_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap); - cfs_spin_unlock(&mdt->mdt_client_bitmap_lock); - CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in " "%s, rc %d\n", med->med_lr_idx, LAST_RCVD, rc); - OBD_FREE_PTR(lcd); RETURN(0); free: - cfs_mutex_down(&med->med_lcd_lock); - med->med_lcd = NULL; - cfs_mutex_up(&med->med_lcd_lock); - OBD_FREE_PTR(lcd); return 0; } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 4aed2ae..8043556 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -122,6 +122,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, { struct filter_obd *filter = &exp->exp_obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; + struct lr_server_data *fsd = class_server_data(exp->exp_obd); struct lsd_client_data *lcd; __u64 last_rcvd; loff_t off; @@ -142,22 +143,19 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, cfs_mutex_up(&fed->fed_lcd_lock); CWARN("commit transaction for disconnected client %s: rc %d\n", exp->exp_client_uuid.uuid, rc); - err = filter_update_server_data(exp->exp_obd, - filter->fo_rcvd_filp, - filter->fo_fsd); + err = filter_update_server_data(exp->exp_obd); RETURN(err); } /* we don't allocate new transnos for replayed requests */ cfs_spin_lock(&filter->fo_translock); if (oti->oti_transno == 0) { - last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; - filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); + last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1; + fsd->lsd_last_transno = cpu_to_le64(last_rcvd); } else { last_rcvd = oti->oti_transno; - if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno)) - filter->fo_fsd->lsd_last_transno = - cpu_to_le64(last_rcvd); + if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno)) + fsd->lsd_last_transno = cpu_to_le64(last_rcvd); } oti->oti_transno = last_rcvd; @@ -313,6 +311,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, { struct filter_obd *filter = &obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; + struct lr_server_data *fsd = class_server_data(obd); unsigned long *bitmap = filter->fo_last_rcvd_slots; int new_client = (cl_idx == -1); @@ -350,8 +349,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, } fed->fed_lr_idx = cl_idx; - fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) + - cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size); + fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) + + cl_idx * le16_to_cpu(fsd->lsd_client_size); cfs_init_mutex(&fed->fed_lcd_lock); LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); @@ -376,8 +375,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, rc = PTR_ERR(handle); CERROR("unable to start transaction: rc %d\n", rc); } else { - fed->fed_lcd->lcd_last_epoch = - filter->fo_fsd->lsd_start_epoch; + fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch; exp->exp_last_request_time = cfs_time_current_sec(); rc = fsfilt_add_journal_cb(obd, 0, handle, target_client_add_cb, @@ -406,9 +404,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, RETURN(0); } -struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */ - -static int filter_client_free(struct obd_export *exp) +static int filter_client_del(struct obd_export *exp) { struct filter_export_data *fed = &exp->exp_filter_data; struct filter_obd *filter = &exp->exp_obd->u.filter; @@ -453,12 +449,13 @@ static int filter_client_free(struct obd_export *exp) /* Make sure the server's last_transno is up to date. * This should be done before zeroing client slot so last_transno will * be in server data or in client data in case of failure */ - filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd); + filter_update_server_data(obd); cfs_mutex_down(&fed->fed_lcd_lock); - rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd, - sizeof(zero_lcd), &off, 0); - fed->fed_lcd = NULL; + memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid); + rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, + fed->fed_lcd, + sizeof(*fed->fed_lcd), &off, 0); cfs_mutex_up(&fed->fed_lcd_lock); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); @@ -466,21 +463,8 @@ static int filter_client_free(struct obd_export *exp) "zero out client %s at idx %u/%llu in %s, rc %d\n", lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, LAST_RCVD, rc); - - if (!cfs_test_and_clear_bit(fed->fed_lr_idx, - filter->fo_last_rcvd_slots)) { - CERROR("FILTER client %u: bit already clear in bitmap!!\n", - fed->fed_lr_idx); - LBUG(); - } - OBD_FREE_PTR(lcd); - RETURN(0); + EXIT; free: - cfs_mutex_down(&fed->fed_lcd_lock); - fed->fed_lcd = NULL; - cfs_mutex_up(&fed->fed_lcd_lock); - OBD_FREE_PTR(lcd); - return 0; } @@ -670,17 +654,16 @@ static int filter_init_export(struct obd_export *exp) static int filter_free_server_data(struct filter_obd *filter) { - OBD_FREE_PTR(filter->fo_fsd); - filter->fo_fsd = NULL; - OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8); - filter->fo_last_rcvd_slots = NULL; + lut_fini(NULL, filter->fo_obt.obt_lut); + OBD_FREE_PTR(filter->fo_obt.obt_lut); return 0; } /* assumes caller is already in kernel ctxt */ -int filter_update_server_data(struct obd_device *obd, struct file *filp, - struct lr_server_data *fsd) +int filter_update_server_data(struct obd_device *obd) { + struct file *filp = obd->u.obt.obt_rcvd_filp; + struct lr_server_data *fsd = class_server_data(obd); loff_t off = 0; int rc; ENTRY; @@ -733,6 +716,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) struct lsd_client_data *lcd = NULL; struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = i_size_read(inode); + struct lu_target *lut; __u64 mount_count; __u32 start_epoch; int cl_idx; @@ -745,17 +729,14 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) CLASSERT (offsetof(struct lsd_client_data, lcd_padding) + sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE); - OBD_ALLOC(fsd, sizeof(*fsd)); - if (!fsd) - RETURN(-ENOMEM); - filter->fo_fsd = fsd; - - OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8); - if (filter->fo_last_rcvd_slots == NULL) { - OBD_FREE(fsd, sizeof(*fsd)); + /* allocate and initialize lu_target */ + OBD_ALLOC_PTR(lut); + if (lut == NULL) RETURN(-ENOMEM); - } - + rc = lut_init(NULL, lut, obd, NULL); + if (rc) + GOTO(err_fsd, rc); + fsd = class_server_data(obd); if (last_rcvd_size == 0) { LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name); @@ -929,7 +910,7 @@ out: fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count); /* save it, so mount count and last_transno is current */ - rc = filter_update_server_data(obd, filp, filter->fo_fsd); + rc = filter_update_server_data(obd); if (rc) GOTO(err_client, rc); @@ -1317,16 +1298,13 @@ static int filter_prep(struct obd_device *obd) GOTO(err_filp, rc = -EOPNOTSUPP); } - /** lu_target has very limited use in filter now */ - lut_init(NULL, &filter->fo_lut, obd, NULL); - rc = filter_init_server_data(obd, file); if (rc) { CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc); GOTO(err_filp, rc); } - - target_recovery_init(&filter->fo_lut, ost_handle); + LASSERT(obd->u.obt.obt_lut); + target_recovery_init(obd->u.obt.obt_lut, ost_handle); /* open and create health check io file*/ file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644); @@ -1380,8 +1358,7 @@ static void filter_post(struct obd_device *obd) * from lastobjid */ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd); + rc = filter_update_server_data(obd); if (rc) CERROR("error writing server data: rc = %d\n", rc); @@ -1414,7 +1391,6 @@ static void filter_post(struct obd_device *obd) static void filter_set_last_id(struct filter_obd *filter, obd_id id, obd_gr group) { - LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); cfs_spin_lock(&filter->fo_objidlock); @@ -1425,7 +1401,6 @@ static void filter_set_last_id(struct filter_obd *filter, obd_id filter_last_id(struct filter_obd *filter, obd_gr group) { obd_id id; - LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); LASSERT(filter->fo_last_objids != NULL); @@ -2068,7 +2043,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, for (i = 0; i < 32; i++) cfs_sema_init(&filter->fo_create_locks[i], 1); - cfs_spin_lock_init(&filter->fo_translock); cfs_spin_lock_init(&filter->fo_objidlock); CFS_INIT_LIST_HEAD(&filter->fo_export_list); cfs_sema_init(&filter->fo_alloc_lock, 1); @@ -2704,8 +2678,7 @@ static int filter_connect_internal(struct obd_export *exp, } if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { - struct filter_obd *filter = &exp->exp_obd->u.filter; - struct lr_server_data *lsd = filter->fo_fsd; + struct lr_server_data *lsd = class_server_data(exp->exp_obd); int index = le32_to_cpu(lsd->lsd_ost_index); if (!(lsd->lsd_feature_compat & @@ -2715,8 +2688,7 @@ static int filter_connect_internal(struct obd_export *exp, lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST); /* sync is not needed here as filter_client_add will * set exp_need_sync flag */ - filter_update_server_data(exp->exp_obd, - filter->fo_rcvd_filp, lsd); + filter_update_server_data(exp->exp_obd); } else if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" @@ -2972,12 +2944,13 @@ static void filter_grant_discard(struct obd_export *exp) static int filter_destroy_export(struct obd_export *exp) { + struct filter_export_data *fed = &exp->exp_filter_data; ENTRY; - if (exp->exp_filter_data.fed_pending) + if (fed->fed_pending) CERROR("%s: cli %s/%p has %lu pending on destroyed export\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, - exp, exp->exp_filter_data.fed_pending); + exp, fed->fed_pending); lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); @@ -2987,6 +2960,7 @@ static int filter_destroy_export(struct obd_export *exp) if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); + lut_client_free(exp); if (!exp->exp_obd->obd_replayable) fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb); @@ -3082,7 +3056,7 @@ static int filter_disconnect(struct obd_export *exp) rc = server_disconnect_export(exp); if (exp->exp_obd->obd_replayable) - filter_client_free(exp); + filter_client_del(exp); else fsfilt_sync(obd, obd->u.obt.obt_sb); @@ -3714,7 +3688,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, osfs->os_bsize - 1) >> blockbits)); if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) { - struct lr_server_data *lsd = filter->fo_fsd; + struct lr_server_data *lsd = class_server_data(obd); int index = le32_to_cpu(lsd->lsd_ost_index); if (obd_fail_val == -1 || diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index d3df199..36bf7ae 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -138,8 +138,7 @@ __u64 filter_next_id(struct filter_obd *, struct obdo *); __u64 filter_last_id(struct filter_obd *, obd_gr group); int filter_update_fidea(struct obd_export *exp, struct inode *inode, void *handle, struct obdo *oa); -int filter_update_server_data(struct obd_device *, struct file *, - struct lr_server_data *); +int filter_update_server_data(struct obd_device *); int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg, void *option); diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 4704c20..2b2d522 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -41,6 +41,7 @@ #include #include + /** * Update client data in last_rcvd file. An obd API */ @@ -48,6 +49,7 @@ static int obt_client_data_update(struct obd_export *exp) { struct lu_export_data *led = &exp->exp_target_data; struct obd_device_target *obt = &exp->exp_obd->u.obt; + struct lu_target *lut = class_exp2tgt(exp); loff_t off = led->led_lr_off; int rc = 0; @@ -56,7 +58,7 @@ static int obt_client_data_update(struct obd_export *exp) CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n", led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch), - le32_to_cpu(obt->obt_lsd->lsd_start_epoch)); + le32_to_cpu(lut->lut_lsd.lsd_start_epoch)); return rc; } @@ -64,21 +66,22 @@ static int obt_client_data_update(struct obd_export *exp) /** * Update server data in last_rcvd file. An obd API */ -int obt_server_data_update(struct obd_device *obd, int force_sync) +int obt_server_data_update(struct lu_target *lut, int force_sync) { - struct obd_device_target *obt = &obd->u.obt; + struct obd_device_target *obt = &lut->lut_obd->u.obt; loff_t off = 0; int rc; ENTRY; CDEBUG(D_SUPER, "%s: mount_count is "LPU64", last_transno is "LPU64"\n", - obt->obt_lsd->lsd_uuid, - le64_to_cpu(obt->obt_lsd->lsd_mount_count), - le64_to_cpu(obt->obt_lsd->lsd_last_transno)); + lut->lut_lsd.lsd_uuid, + le64_to_cpu(lut->lut_lsd.lsd_mount_count), + le64_to_cpu(lut->lut_lsd.lsd_last_transno)); - rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd, - sizeof(*obt->obt_lsd), &off, force_sync); + rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp, + &lut->lut_lsd, sizeof(lut->lut_lsd), + &off, force_sync); if (rc) CERROR("error writing lr_server_data: rc = %d\n", rc); @@ -91,32 +94,32 @@ int obt_server_data_update(struct obd_device *obd, int force_sync) void obt_client_epoch_update(struct obd_export *exp) { struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; - struct obd_device_target *obt = &exp->exp_obd->u.obt; + struct lu_target *lut = class_exp2tgt(exp); /** VBR: set client last_epoch to current epoch */ if (le32_to_cpu(lcd->lcd_last_epoch) >= - le32_to_cpu(obt->obt_lsd->lsd_start_epoch)) + le32_to_cpu(lut->lut_lsd.lsd_start_epoch)) return; - lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch; + lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; obt_client_data_update(exp); } /** * Increment server epoch. An obd API */ -static void obt_boot_epoch_update(struct obd_device *obd) +static void obt_boot_epoch_update(struct lu_target *lut) { + struct obd_device *obd = lut->lut_obd; __u32 start_epoch; - struct obd_device_target *obt = &obd->u.obt; struct ptlrpc_request *req; cfs_list_t client_list; - cfs_spin_lock(&obt->obt_translock); - start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1; - obt->obt_last_transno = cpu_to_le64((__u64)start_epoch << + cfs_spin_lock(&lut->lut_translock); + start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1; + lut->lut_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS); - obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch); - cfs_spin_unlock(&obt->obt_translock); + lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch); + cfs_spin_unlock(&lut->lut_translock); CFS_INIT_LIST_HEAD(&client_list); cfs_spin_lock_bh(&obd->obd_processing_task_lock); @@ -135,7 +138,7 @@ static void obt_boot_epoch_update(struct obd_device *obd) cfs_spin_lock_bh(&obd->obd_processing_task_lock); cfs_list_splice_init(&client_list, &obd->obd_final_req_queue); cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - obt_server_data_update(obd, 1); + obt_server_data_update(lut, 1); } /** @@ -168,12 +171,33 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, } /** + * Free in-memory data for client slot related to export. + */ +void lut_client_free(struct obd_export *exp) +{ + struct lu_export_data *led = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); + + OBD_FREE_PTR(led->led_lcd); + led->led_lcd = NULL; + /* Clear bit when lcd is freed */ + spin_lock(&lut->lut_client_bitmap_lock); + if (!test_and_clear_bit(led->led_lr_idx, lut->lut_client_bitmap)) { + CERROR("%s: client %u bit already clear in bitmap\n", + exp->exp_obd->obd_name, led->led_lr_idx); + LBUG(); + } + spin_unlock(&lut->lut_client_bitmap_lock); +} +EXPORT_SYMBOL(lut_client_free); + +/** * Update client data in last_rcvd */ -int lut_client_data_update(const struct lu_env *env, struct lu_target *lut, - struct obd_export *exp) +int lut_client_data_update(const struct lu_env *env, struct obd_export *exp) { struct lu_export_data *led = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); struct lsd_client_data tmp_lcd; loff_t tmp_off = led->led_lr_off; struct lu_buf tmp_buf = { @@ -219,17 +243,17 @@ static int lut_server_data_update(const struct lu_env *env, RETURN(rc); } -void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut, - struct obd_export *exp) +void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp) { struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; + struct lu_target *lut = class_exp2tgt(exp); LASSERT(lut->lut_bottom); /** VBR: set client last_epoch to current epoch */ if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch) return; lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; - lut_client_data_update(env, lut, exp); + lut_client_data_update(env, exp); } /** @@ -247,7 +271,7 @@ void lut_boot_epoch_update(struct lu_target *lut) return; /** Increase server epoch after recovery */ if (lut->lut_bottom == NULL) - return obt_boot_epoch_update(lut->lut_obd); + return obt_boot_epoch_update(lut); rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) { @@ -277,7 +301,7 @@ void lut_boot_epoch_update(struct lu_target *lut) cfs_list_for_each_entry(req, &client_list, rq_list) { LASSERT(!req->rq_export->exp_delayed); if (!req->rq_export->exp_vbr_failed) - lut_client_epoch_update(&env, lut, req->rq_export); + lut_client_epoch_update(&env, req->rq_export); } /** return list back at once */ cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); @@ -332,13 +356,19 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, int rc = 0; ENTRY; + LASSERT(lut); + LASSERT(obd); lut->lut_obd = obd; lut->lut_bottom = dt; lut->lut_last_rcvd = NULL; + obd->u.obt.obt_lut = lut; cfs_spin_lock_init(&lut->lut_translock); cfs_spin_lock_init(&lut->lut_client_bitmap_lock); - cfs_spin_lock_init(&lut->lut_trans_table_lock); + + OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + if (lut->lut_client_bitmap == NULL) + RETURN(-ENOMEM); /** obdfilter has no lu_device stack yet */ if (dt == NULL) @@ -347,6 +377,8 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, if (!IS_ERR(o)) { lut->lut_last_rcvd = o; } else { + OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + lut->lut_client_bitmap = NULL; rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); } @@ -358,9 +390,15 @@ EXPORT_SYMBOL(lut_init); void lut_fini(const struct lu_env *env, struct lu_target *lut) { ENTRY; - if (lut->lut_last_rcvd) + + if (lut->lut_client_bitmap) { + OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + lut->lut_client_bitmap = NULL; + } + if (lut->lut_last_rcvd) { lu_object_put(env, &lut->lut_last_rcvd->do_lu); - lut->lut_last_rcvd = NULL; + lut->lut_last_rcvd = NULL; + } EXIT; } EXPORT_SYMBOL(lut_fini); -- 1.8.3.1