From: Robert Read Date: Tue, 2 Mar 2010 02:08:23 +0000 (-0800) Subject: Revert "b=21485 Keep in-memory client data until export destroy" X-Git-Tag: 1.10.0.38~13 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=d022da725970a8903cafdfcb33fd0274e7d41ed2;p=fs%2Flustre-release.git Revert "b=21485 Keep in-memory client data until export destroy" This reverts commit 64d7c036a5429a9ce5435aea96e30935b1cef44a. Causes compile error: .../lustre/include/lu_target.h:59: error: 'LR_CLIENT_BITMAP_SIZE' undeclared here (not in a function) --- diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index 0a456db6..8141fce 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -217,6 +217,8 @@ struct lustre_mount_data { #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8) #endif +#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long)) + /** COMPAT_146: this is an OST (temporary) */ #define OBD_COMPAT_OST 0x00000002 /** COMPAT_146: this is an MDT (temporary) */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index bedb530..454f383 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -232,7 +232,17 @@ struct obd_device_target { struct super_block *obt_sb; /** last_rcvd file */ struct file *obt_rcvd_filp; - struct lu_target *obt_lut; + /** server data in last_rcvd file */ + struct lr_server_data *obt_lsd; + /** Lock protecting client bitmap */ + cfs_spinlock_t obt_client_bitmap_lock; + /** Bitmap of known clients */ + unsigned long *obt_client_bitmap; + /** Server last transaction number */ + __u64 obt_last_transno; + /** Lock protecting last transaction number */ + cfs_spinlock_t obt_translock; + /** Number of mounts */ __u64 obt_mount_count; cfs_semaphore_t obt_quotachecking; struct lustre_quota_ctxt obt_qctxt; @@ -277,6 +287,7 @@ struct filter_ext { struct filter_obd { /* NB this field MUST be first */ struct obd_device_target fo_obt; + struct lu_target fo_lut; const char *fo_fstype; int fo_group_count; @@ -357,10 +368,11 @@ struct filter_obd { int fo_sec_level; }; -#define fo_translock fo_obt.obt_lut->lut_translock -#define fo_last_rcvd_slots fo_obt.obt_lut->lut_client_bitmap -#define fo_mount_count fo_obt.obt_lut->lut_mount_count +#define fo_translock fo_obt.obt_translock #define fo_rcvd_filp fo_obt.obt_rcvd_filp +#define fo_fsd fo_obt.obt_lsd +#define fo_last_rcvd_slots fo_obt.obt_client_bitmap +#define fo_mount_count fo_obt.obt_mount_count #define fo_vfsmnt fo_obt.obt_vfsmnt struct timeout_item { diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 0a16ece..550e0a8 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -103,6 +103,7 @@ void obd_zombie_impexp_stop(void); void obd_zombie_impexp_cull(void); void obd_zombie_barrier(void); void obd_exports_barrier(struct obd_device *obd); + /* obd_config.c */ int class_process_config(struct lustre_cfg *lcfg); int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars, @@ -271,18 +272,6 @@ static inline enum obd_option exp_flags_from_obd(struct obd_device *obd) 0); } -static inline struct lu_target *class_exp2tgt(struct obd_export *exp) -{ - LASSERT(exp->exp_obd); - return exp->exp_obd->u.obt.obt_lut; -} - -static inline struct lr_server_data *class_server_data(struct obd_device *obd) -{ - LASSERT(obd->u.obt.obt_lut); - return &obd->u.obt.obt_lut->lut_lsd; -} - void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid); void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj); void obdo_from_iattr(struct obdo *oa, struct iattr *attr, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index f8a6f9b..ed96c2b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4498,6 +4498,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); + cfs_spin_lock_init(&m->mdt_transno_lock); + m->mdt_max_mdsize = MAX_MD_SIZE; m->mdt_max_cookiesize = sizeof(struct llog_cookie); m->mdt_som_conf = 0; @@ -4537,6 +4539,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_nosquash_strlen = 0; cfs_init_rwsem(&m->mdt_squash_sem); + cfs_spin_lock_init(&m->mdt_client_bitmap_lock); + OBD_ALLOC_PTR(mite); if (mite == NULL) GOTO(err_lmi, rc = -ENOMEM); @@ -5207,7 +5211,6 @@ static int mdt_destroy_export(struct obd_export *exp) if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); - lut_client_free(exp); RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index f41651a..9c1a885 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -703,15 +703,25 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) GOTO(free, rc = PTR_ERR(th)); cfs_mutex_down(&med->med_lcd_lock); - memset(lcd->lcd_uuid, 0, sizeof lcd->lcd_uuid); + memset(lcd, 0, sizeof *lcd); rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th); + med->med_lcd = NULL; cfs_mutex_up(&med->med_lcd_lock); mdt_trans_stop(env, mdt, th); + cfs_spin_lock(&mdt->mdt_client_bitmap_lock); + cfs_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap); + cfs_spin_unlock(&mdt->mdt_client_bitmap_lock); + CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in " "%s, rc %d\n", med->med_lr_idx, LAST_RCVD, rc); + OBD_FREE_PTR(lcd); RETURN(0); free: + cfs_mutex_down(&med->med_lcd_lock); + med->med_lcd = NULL; + cfs_mutex_up(&med->med_lcd_lock); + OBD_FREE_PTR(lcd); return 0; } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 8043556..4aed2ae 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -122,7 +122,6 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, { struct filter_obd *filter = &exp->exp_obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; - struct lr_server_data *fsd = class_server_data(exp->exp_obd); struct lsd_client_data *lcd; __u64 last_rcvd; loff_t off; @@ -143,19 +142,22 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, cfs_mutex_up(&fed->fed_lcd_lock); CWARN("commit transaction for disconnected client %s: rc %d\n", exp->exp_client_uuid.uuid, rc); - err = filter_update_server_data(exp->exp_obd); + err = filter_update_server_data(exp->exp_obd, + filter->fo_rcvd_filp, + filter->fo_fsd); RETURN(err); } /* we don't allocate new transnos for replayed requests */ cfs_spin_lock(&filter->fo_translock); if (oti->oti_transno == 0) { - last_rcvd = le64_to_cpu(fsd->lsd_last_transno) + 1; - fsd->lsd_last_transno = cpu_to_le64(last_rcvd); + last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; + filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); } else { last_rcvd = oti->oti_transno; - if (last_rcvd > le64_to_cpu(fsd->lsd_last_transno)) - fsd->lsd_last_transno = cpu_to_le64(last_rcvd); + if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno)) + filter->fo_fsd->lsd_last_transno = + cpu_to_le64(last_rcvd); } oti->oti_transno = last_rcvd; @@ -311,7 +313,6 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, { struct filter_obd *filter = &obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; - struct lr_server_data *fsd = class_server_data(obd); unsigned long *bitmap = filter->fo_last_rcvd_slots; int new_client = (cl_idx == -1); @@ -349,8 +350,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, } fed->fed_lr_idx = cl_idx; - fed->fed_lr_off = le32_to_cpu(fsd->lsd_client_start) + - cl_idx * le16_to_cpu(fsd->lsd_client_size); + fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) + + cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size); cfs_init_mutex(&fed->fed_lcd_lock); LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); @@ -375,7 +376,8 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, rc = PTR_ERR(handle); CERROR("unable to start transaction: rc %d\n", rc); } else { - fed->fed_lcd->lcd_last_epoch = fsd->lsd_start_epoch; + fed->fed_lcd->lcd_last_epoch = + filter->fo_fsd->lsd_start_epoch; exp->exp_last_request_time = cfs_time_current_sec(); rc = fsfilt_add_journal_cb(obd, 0, handle, target_client_add_cb, @@ -404,7 +406,9 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, RETURN(0); } -static int filter_client_del(struct obd_export *exp) +struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */ + +static int filter_client_free(struct obd_export *exp) { struct filter_export_data *fed = &exp->exp_filter_data; struct filter_obd *filter = &exp->exp_obd->u.filter; @@ -449,13 +453,12 @@ static int filter_client_del(struct obd_export *exp) /* Make sure the server's last_transno is up to date. * This should be done before zeroing client slot so last_transno will * be in server data or in client data in case of failure */ - filter_update_server_data(obd); + filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd); cfs_mutex_down(&fed->fed_lcd_lock); - memset(fed->fed_lcd->lcd_uuid, 0, sizeof fed->fed_lcd->lcd_uuid); - rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, - fed->fed_lcd, - sizeof(*fed->fed_lcd), &off, 0); + rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd, + sizeof(zero_lcd), &off, 0); + fed->fed_lcd = NULL; cfs_mutex_up(&fed->fed_lcd_lock); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); @@ -463,8 +466,21 @@ static int filter_client_del(struct obd_export *exp) "zero out client %s at idx %u/%llu in %s, rc %d\n", lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, LAST_RCVD, rc); - EXIT; + + if (!cfs_test_and_clear_bit(fed->fed_lr_idx, + filter->fo_last_rcvd_slots)) { + CERROR("FILTER client %u: bit already clear in bitmap!!\n", + fed->fed_lr_idx); + LBUG(); + } + OBD_FREE_PTR(lcd); + RETURN(0); free: + cfs_mutex_down(&fed->fed_lcd_lock); + fed->fed_lcd = NULL; + cfs_mutex_up(&fed->fed_lcd_lock); + OBD_FREE_PTR(lcd); + return 0; } @@ -654,16 +670,17 @@ static int filter_init_export(struct obd_export *exp) static int filter_free_server_data(struct filter_obd *filter) { - lut_fini(NULL, filter->fo_obt.obt_lut); - OBD_FREE_PTR(filter->fo_obt.obt_lut); + OBD_FREE_PTR(filter->fo_fsd); + filter->fo_fsd = NULL; + OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8); + filter->fo_last_rcvd_slots = NULL; return 0; } /* assumes caller is already in kernel ctxt */ -int filter_update_server_data(struct obd_device *obd) +int filter_update_server_data(struct obd_device *obd, struct file *filp, + struct lr_server_data *fsd) { - struct file *filp = obd->u.obt.obt_rcvd_filp; - struct lr_server_data *fsd = class_server_data(obd); loff_t off = 0; int rc; ENTRY; @@ -716,7 +733,6 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) struct lsd_client_data *lcd = NULL; struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = i_size_read(inode); - struct lu_target *lut; __u64 mount_count; __u32 start_epoch; int cl_idx; @@ -729,14 +745,17 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) CLASSERT (offsetof(struct lsd_client_data, lcd_padding) + sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE); - /* allocate and initialize lu_target */ - OBD_ALLOC_PTR(lut); - if (lut == NULL) + OBD_ALLOC(fsd, sizeof(*fsd)); + if (!fsd) RETURN(-ENOMEM); - rc = lut_init(NULL, lut, obd, NULL); - if (rc) - GOTO(err_fsd, rc); - fsd = class_server_data(obd); + filter->fo_fsd = fsd; + + OBD_ALLOC(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8); + if (filter->fo_last_rcvd_slots == NULL) { + OBD_FREE(fsd, sizeof(*fsd)); + RETURN(-ENOMEM); + } + if (last_rcvd_size == 0) { LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name); @@ -910,7 +929,7 @@ out: fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count); /* save it, so mount count and last_transno is current */ - rc = filter_update_server_data(obd); + rc = filter_update_server_data(obd, filp, filter->fo_fsd); if (rc) GOTO(err_client, rc); @@ -1298,13 +1317,16 @@ static int filter_prep(struct obd_device *obd) GOTO(err_filp, rc = -EOPNOTSUPP); } + /** lu_target has very limited use in filter now */ + lut_init(NULL, &filter->fo_lut, obd, NULL); + rc = filter_init_server_data(obd, file); if (rc) { CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc); GOTO(err_filp, rc); } - LASSERT(obd->u.obt.obt_lut); - target_recovery_init(obd->u.obt.obt_lut, ost_handle); + + target_recovery_init(&filter->fo_lut, ost_handle); /* open and create health check io file*/ file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644); @@ -1358,7 +1380,8 @@ static void filter_post(struct obd_device *obd) * from lastobjid */ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = filter_update_server_data(obd); + rc = filter_update_server_data(obd, filter->fo_rcvd_filp, + filter->fo_fsd); if (rc) CERROR("error writing server data: rc = %d\n", rc); @@ -1391,6 +1414,7 @@ static void filter_post(struct obd_device *obd) static void filter_set_last_id(struct filter_obd *filter, obd_id id, obd_gr group) { + LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); cfs_spin_lock(&filter->fo_objidlock); @@ -1401,6 +1425,7 @@ static void filter_set_last_id(struct filter_obd *filter, obd_id filter_last_id(struct filter_obd *filter, obd_gr group) { obd_id id; + LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); LASSERT(filter->fo_last_objids != NULL); @@ -2043,6 +2068,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, for (i = 0; i < 32; i++) cfs_sema_init(&filter->fo_create_locks[i], 1); + cfs_spin_lock_init(&filter->fo_translock); cfs_spin_lock_init(&filter->fo_objidlock); CFS_INIT_LIST_HEAD(&filter->fo_export_list); cfs_sema_init(&filter->fo_alloc_lock, 1); @@ -2678,7 +2704,8 @@ static int filter_connect_internal(struct obd_export *exp, } if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { - struct lr_server_data *lsd = class_server_data(exp->exp_obd); + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct lr_server_data *lsd = filter->fo_fsd; int index = le32_to_cpu(lsd->lsd_ost_index); if (!(lsd->lsd_feature_compat & @@ -2688,7 +2715,8 @@ static int filter_connect_internal(struct obd_export *exp, lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST); /* sync is not needed here as filter_client_add will * set exp_need_sync flag */ - filter_update_server_data(exp->exp_obd); + filter_update_server_data(exp->exp_obd, + filter->fo_rcvd_filp, lsd); } else if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" @@ -2944,13 +2972,12 @@ static void filter_grant_discard(struct obd_export *exp) static int filter_destroy_export(struct obd_export *exp) { - struct filter_export_data *fed = &exp->exp_filter_data; ENTRY; - if (fed->fed_pending) + if (exp->exp_filter_data.fed_pending) CERROR("%s: cli %s/%p has %lu pending on destroyed export\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, - exp, fed->fed_pending); + exp, exp->exp_filter_data.fed_pending); lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); @@ -2960,7 +2987,6 @@ static int filter_destroy_export(struct obd_export *exp) if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); - lut_client_free(exp); if (!exp->exp_obd->obd_replayable) fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb); @@ -3056,7 +3082,7 @@ static int filter_disconnect(struct obd_export *exp) rc = server_disconnect_export(exp); if (exp->exp_obd->obd_replayable) - filter_client_del(exp); + filter_client_free(exp); else fsfilt_sync(obd, obd->u.obt.obt_sb); @@ -3688,7 +3714,7 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, osfs->os_bsize - 1) >> blockbits)); if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) { - struct lr_server_data *lsd = class_server_data(obd); + struct lr_server_data *lsd = filter->fo_fsd; int index = le32_to_cpu(lsd->lsd_ost_index); if (obd_fail_val == -1 || diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 36bf7ae..d3df199 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -138,7 +138,8 @@ __u64 filter_next_id(struct filter_obd *, struct obdo *); __u64 filter_last_id(struct filter_obd *, obd_gr group); int filter_update_fidea(struct obd_export *exp, struct inode *inode, void *handle, struct obdo *oa); -int filter_update_server_data(struct obd_device *); +int filter_update_server_data(struct obd_device *, struct file *, + struct lr_server_data *); int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, struct lustre_cfg *lcfg, void *option); diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 2b2d522..4704c20 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -41,7 +41,6 @@ #include #include - /** * Update client data in last_rcvd file. An obd API */ @@ -49,7 +48,6 @@ static int obt_client_data_update(struct obd_export *exp) { struct lu_export_data *led = &exp->exp_target_data; struct obd_device_target *obt = &exp->exp_obd->u.obt; - struct lu_target *lut = class_exp2tgt(exp); loff_t off = led->led_lr_off; int rc = 0; @@ -58,7 +56,7 @@ static int obt_client_data_update(struct obd_export *exp) CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n", led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch), - le32_to_cpu(lut->lut_lsd.lsd_start_epoch)); + le32_to_cpu(obt->obt_lsd->lsd_start_epoch)); return rc; } @@ -66,22 +64,21 @@ static int obt_client_data_update(struct obd_export *exp) /** * Update server data in last_rcvd file. An obd API */ -int obt_server_data_update(struct lu_target *lut, int force_sync) +int obt_server_data_update(struct obd_device *obd, int force_sync) { - struct obd_device_target *obt = &lut->lut_obd->u.obt; + struct obd_device_target *obt = &obd->u.obt; loff_t off = 0; int rc; ENTRY; CDEBUG(D_SUPER, "%s: mount_count is "LPU64", last_transno is "LPU64"\n", - lut->lut_lsd.lsd_uuid, - le64_to_cpu(lut->lut_lsd.lsd_mount_count), - le64_to_cpu(lut->lut_lsd.lsd_last_transno)); + obt->obt_lsd->lsd_uuid, + le64_to_cpu(obt->obt_lsd->lsd_mount_count), + le64_to_cpu(obt->obt_lsd->lsd_last_transno)); - rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp, - &lut->lut_lsd, sizeof(lut->lut_lsd), - &off, force_sync); + rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd, + sizeof(*obt->obt_lsd), &off, force_sync); if (rc) CERROR("error writing lr_server_data: rc = %d\n", rc); @@ -94,32 +91,32 @@ int obt_server_data_update(struct lu_target *lut, int force_sync) void obt_client_epoch_update(struct obd_export *exp) { struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; - struct lu_target *lut = class_exp2tgt(exp); + struct obd_device_target *obt = &exp->exp_obd->u.obt; /** VBR: set client last_epoch to current epoch */ if (le32_to_cpu(lcd->lcd_last_epoch) >= - le32_to_cpu(lut->lut_lsd.lsd_start_epoch)) + le32_to_cpu(obt->obt_lsd->lsd_start_epoch)) return; - lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; + lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch; obt_client_data_update(exp); } /** * Increment server epoch. An obd API */ -static void obt_boot_epoch_update(struct lu_target *lut) +static void obt_boot_epoch_update(struct obd_device *obd) { - struct obd_device *obd = lut->lut_obd; __u32 start_epoch; + struct obd_device_target *obt = &obd->u.obt; struct ptlrpc_request *req; cfs_list_t client_list; - cfs_spin_lock(&lut->lut_translock); - start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1; - lut->lut_last_transno = cpu_to_le64((__u64)start_epoch << + cfs_spin_lock(&obt->obt_translock); + start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1; + obt->obt_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS); - lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch); - cfs_spin_unlock(&lut->lut_translock); + obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch); + cfs_spin_unlock(&obt->obt_translock); CFS_INIT_LIST_HEAD(&client_list); cfs_spin_lock_bh(&obd->obd_processing_task_lock); @@ -138,7 +135,7 @@ static void obt_boot_epoch_update(struct lu_target *lut) cfs_spin_lock_bh(&obd->obd_processing_task_lock); cfs_list_splice_init(&client_list, &obd->obd_final_req_queue); cfs_spin_unlock_bh(&obd->obd_processing_task_lock); - obt_server_data_update(lut, 1); + obt_server_data_update(obd, 1); } /** @@ -171,33 +168,12 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, } /** - * Free in-memory data for client slot related to export. - */ -void lut_client_free(struct obd_export *exp) -{ - struct lu_export_data *led = &exp->exp_target_data; - struct lu_target *lut = class_exp2tgt(exp); - - OBD_FREE_PTR(led->led_lcd); - led->led_lcd = NULL; - /* Clear bit when lcd is freed */ - spin_lock(&lut->lut_client_bitmap_lock); - if (!test_and_clear_bit(led->led_lr_idx, lut->lut_client_bitmap)) { - CERROR("%s: client %u bit already clear in bitmap\n", - exp->exp_obd->obd_name, led->led_lr_idx); - LBUG(); - } - spin_unlock(&lut->lut_client_bitmap_lock); -} -EXPORT_SYMBOL(lut_client_free); - -/** * Update client data in last_rcvd */ -int lut_client_data_update(const struct lu_env *env, struct obd_export *exp) +int lut_client_data_update(const struct lu_env *env, struct lu_target *lut, + struct obd_export *exp) { struct lu_export_data *led = &exp->exp_target_data; - struct lu_target *lut = class_exp2tgt(exp); struct lsd_client_data tmp_lcd; loff_t tmp_off = led->led_lr_off; struct lu_buf tmp_buf = { @@ -243,17 +219,17 @@ static int lut_server_data_update(const struct lu_env *env, RETURN(rc); } -void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp) +void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut, + struct obd_export *exp) { struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; - struct lu_target *lut = class_exp2tgt(exp); LASSERT(lut->lut_bottom); /** VBR: set client last_epoch to current epoch */ if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch) return; lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; - lut_client_data_update(env, exp); + lut_client_data_update(env, lut, exp); } /** @@ -271,7 +247,7 @@ void lut_boot_epoch_update(struct lu_target *lut) return; /** Increase server epoch after recovery */ if (lut->lut_bottom == NULL) - return obt_boot_epoch_update(lut); + return obt_boot_epoch_update(lut->lut_obd); rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) { @@ -301,7 +277,7 @@ void lut_boot_epoch_update(struct lu_target *lut) cfs_list_for_each_entry(req, &client_list, rq_list) { LASSERT(!req->rq_export->exp_delayed); if (!req->rq_export->exp_vbr_failed) - lut_client_epoch_update(&env, req->rq_export); + lut_client_epoch_update(&env, lut, req->rq_export); } /** return list back at once */ cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); @@ -356,19 +332,13 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, int rc = 0; ENTRY; - LASSERT(lut); - LASSERT(obd); lut->lut_obd = obd; lut->lut_bottom = dt; lut->lut_last_rcvd = NULL; - obd->u.obt.obt_lut = lut; cfs_spin_lock_init(&lut->lut_translock); cfs_spin_lock_init(&lut->lut_client_bitmap_lock); - - OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); - if (lut->lut_client_bitmap == NULL) - RETURN(-ENOMEM); + cfs_spin_lock_init(&lut->lut_trans_table_lock); /** obdfilter has no lu_device stack yet */ if (dt == NULL) @@ -377,8 +347,6 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, if (!IS_ERR(o)) { lut->lut_last_rcvd = o; } else { - OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); - lut->lut_client_bitmap = NULL; rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); } @@ -390,15 +358,9 @@ EXPORT_SYMBOL(lut_init); void lut_fini(const struct lu_env *env, struct lu_target *lut) { ENTRY; - - if (lut->lut_client_bitmap) { - OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); - lut->lut_client_bitmap = NULL; - } - if (lut->lut_last_rcvd) { + if (lut->lut_last_rcvd) lu_object_put(env, &lut->lut_last_rcvd->do_lu); - lut->lut_last_rcvd = NULL; - } + lut->lut_last_rcvd = NULL; EXIT; } EXPORT_SYMBOL(lut_fini);