X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=13c2530618bc3eee320b65974713eb06fe40e49a;hp=507e8d8d11afc89f43086bfc725131ab6543c05f;hb=e67c6e366752611ffd2baeb7cefa24c9f289eb78;hpb=a928591d58b5d0dbbcc9a7f534dca2b6df22da9e diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 507e8d8..13c2530 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -79,8 +79,6 @@ #include "filter_internal.h" -/* Group 0 is no longer a legal group, to catch uninitialized IDs */ -#define FILTER_MIN_GROUPS FILTER_GROUP_MDS1_N_BASE static struct lvfs_callback_ops filter_lvfs_ops; cfs_mem_cache_t *ll_fmd_cachep; @@ -90,8 +88,7 @@ static void filter_commit_cb(struct obd_device *obd, __u64 transno, struct obd_export *exp = cb_data; LASSERT(exp->exp_obd == obd); obd_transno_commit_cb(obd, transno, exp, error); - atomic_dec(&exp->exp_cb_count); - class_export_put(exp); + class_export_cb_put(exp); } int filter_version_get_check(struct obd_export *exp, @@ -110,9 +107,9 @@ int filter_version_get_check(struct obd_export *exp, oti->oti_pre_version != curr_version) { CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", oti->oti_pre_version, curr_version); - spin_lock(&exp->exp_lock); + cfs_spin_lock(&exp->exp_lock); exp->exp_vbr_failed = 1; - spin_unlock(&exp->exp_lock); + cfs_spin_unlock(&exp->exp_lock); RETURN (-EOVERFLOW); } oti->oti_pre_version = curr_version; @@ -125,7 +122,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, { struct filter_obd *filter = &exp->exp_obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; - struct lsd_client_data *lcd = fed->fed_lcd; + struct lsd_client_data *lcd; __u64 last_rcvd; loff_t off; int err, log_pri = D_RPCTRACE; @@ -137,8 +134,22 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, if (!exp->exp_obd->obd_replayable || oti == NULL) RETURN(rc); + cfs_mutex_down(&fed->fed_lcd_lock); + lcd = fed->fed_lcd; + /* if the export has already been disconnected, we have no last_rcvd slot, + * update server data with latest transno then */ + if (lcd == NULL) { + cfs_mutex_up(&fed->fed_lcd_lock); + CWARN("commit transaction for disconnected client %s: rc %d\n", + exp->exp_client_uuid.uuid, rc); + err = filter_update_server_data(exp->exp_obd, + filter->fo_rcvd_filp, + filter->fo_fsd); + RETURN(err); + } + /* we don't allocate new transnos for replayed requests */ - spin_lock(&filter->fo_translock); + cfs_spin_lock(&filter->fo_translock); if (oti->oti_transno == 0) { last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); @@ -149,14 +160,12 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, cpu_to_le64(last_rcvd); } oti->oti_transno = last_rcvd; - if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) { - spin_unlock(&filter->fo_translock); - LBUG(); - } + + LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno)); lcd->lcd_last_transno = cpu_to_le64(last_rcvd); lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version); lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid); - spin_unlock(&filter->fo_translock); + cfs_spin_unlock(&filter->fo_translock); if (inode) fsfilt_set_version(exp->exp_obd, inode, last_rcvd); @@ -167,8 +176,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, fed->fed_lr_idx, fed->fed_lr_off); err = -EINVAL; } else { - class_export_get(exp); /* released when the cb is called */ - atomic_inc(&exp->exp_cb_count); + class_export_cb_get(exp); /* released when the cb is called */ if (!force_sync) force_sync = fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, @@ -190,7 +198,7 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n", last_rcvd, lcd->lcd_uuid, fed->fed_lr_idx, err); - + cfs_mutex_up(&fed->fed_lcd_lock); RETURN(rc); } @@ -208,7 +216,7 @@ static void init_brw_stats(struct brw_stats *brw_stats) { int i; for (i = 0; i < BRW_LAST; i++) - spin_lock_init(&brw_stats->hist[i].oh_lock); + cfs_spin_lock_init(&brw_stats->hist[i].oh_lock); } static int lprocfs_init_rw_stats(struct obd_device *obd, @@ -259,7 +267,7 @@ static int filter_export_stats_init(struct obd_device *obd, OBD_ALLOC(tmp->nid_brw_stats, sizeof(struct brw_stats)); if (tmp->nid_brw_stats == NULL) - RETURN(-ENOMEM); + GOTO(clean, rc = -ENOMEM); init_brw_stats(tmp->nid_brw_stats); rc = lprocfs_seq_create(exp->exp_nid_stats->nid_proc, "brw_stats", @@ -270,27 +278,30 @@ static int filter_export_stats_init(struct obd_device *obd, rc = lprocfs_init_rw_stats(obd, &exp->exp_nid_stats->nid_stats); if (rc) - RETURN(rc); + GOTO(clean, rc); rc = lprocfs_register_stats(tmp->nid_proc, "stats", tmp->nid_stats); if (rc) - RETURN(rc); + GOTO(clean, rc); /* Always add in ldlm_stats */ tmp->nid_ldlm_stats = lprocfs_alloc_stats(LDLM_LAST_OPC - LDLM_FIRST_OPC, LPROCFS_STATS_FLAG_NOPERCPU); if (tmp->nid_ldlm_stats == NULL) - return -ENOMEM; + GOTO(clean, rc = -ENOMEM); lprocfs_init_ldlm_stats(tmp->nid_ldlm_stats); rc = lprocfs_register_stats(tmp->nid_proc, "ldlm_stats", tmp->nid_ldlm_stats); if (rc) - RETURN(rc); + GOTO(clean, rc); } RETURN(0); + clean: + lprocfs_exp_cleanup(exp); + return rc; } /* Add client data to the FILTER. We use a bitmap to locate a free space @@ -318,20 +329,20 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, * there's no need for extra complication here */ if (new_client) { - cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); + cl_idx = cfs_find_first_zero_bit(bitmap, LR_MAX_CLIENTS); repeat: if (cl_idx >= LR_MAX_CLIENTS) { CERROR("no room for %u client - fix LR_MAX_CLIENTS\n", cl_idx); RETURN(-EOVERFLOW); } - if (test_and_set_bit(cl_idx, bitmap)) { - cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, - cl_idx); + if (cfs_test_and_set_bit(cl_idx, bitmap)) { + cl_idx = cfs_find_next_zero_bit(bitmap, LR_MAX_CLIENTS, + cl_idx); goto repeat; } } else { - if (test_and_set_bit(cl_idx, bitmap)) { + if (cfs_test_and_set_bit(cl_idx, bitmap)) { CERROR("FILTER client %d: bit already set in bitmap!\n", cl_idx); LBUG(); @@ -341,6 +352,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, fed->fed_lr_idx = cl_idx; fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) + cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size); + cfs_init_mutex(&fed->fed_lcd_lock); LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", @@ -368,11 +380,12 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, filter->fo_fsd->lsd_start_epoch; exp->exp_last_request_time = cfs_time_current_sec(); rc = fsfilt_add_journal_cb(obd, 0, handle, - target_client_add_cb, exp); + target_client_add_cb, + class_export_cb_get(exp)); if (rc == 0) { - spin_lock(&exp->exp_lock); + cfs_spin_lock(&exp->exp_lock); exp->exp_need_sync = 1; - spin_unlock(&exp->exp_lock); + cfs_spin_unlock(&exp->exp_lock); } rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, fed->fed_lcd, @@ -401,15 +414,16 @@ static int filter_client_free(struct obd_export *exp) struct filter_obd *filter = &exp->exp_obd->u.filter; struct obd_device *obd = exp->exp_obd; struct lvfs_run_ctxt saved; + struct lsd_client_data *lcd = fed->fed_lcd; int rc; loff_t off; ENTRY; - if (fed->fed_lcd == NULL) + if (lcd == NULL) RETURN(0); /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ - if (strcmp(fed->fed_lcd->lcd_uuid, obd->obd_uuid.uuid ) == 0) + if (strcmp(lcd->lcd_uuid, obd->obd_uuid.uuid ) == 0) GOTO(free, 0); LASSERT(filter->fo_last_rcvd_slots != NULL); @@ -417,7 +431,7 @@ static int filter_client_free(struct obd_export *exp) off = fed->fed_lr_off; CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n", - fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid); + fed->fed_lr_idx, fed->fed_lr_off, lcd->lcd_uuid); /* Don't clear fed_lr_idx here as it is likely also unset. At worst * we leak a client slot that will be cleaned on the next recovery. */ @@ -429,45 +443,43 @@ static int filter_client_free(struct obd_export *exp) /* Clear the bit _after_ zeroing out the client so we don't race with filter_client_add and zero out new clients.*/ - if (!test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { + if (!cfs_test_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { CERROR("FILTER client %u: bit already clear in bitmap!!\n", fed->fed_lr_idx); LBUG(); } - if (!(exp->exp_flags & OBD_OPT_FAILOVER)) { - /* Don't force sync on disconnect if aborting recovery, - * or it does num_clients * num_osts. b=17194 */ - int need_sync = exp->exp_need_sync && - !(exp->exp_flags&OBD_OPT_ABORT_RECOV); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd, - sizeof(zero_lcd), &off, 0); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + /* Make sure the server's last_transno is up to date. + * This should be done before zeroing client slot so last_transno will + * be in server data or in client data in case of failure */ + filter_update_server_data(obd, filter->fo_rcvd_filp, filter->fo_fsd); + + cfs_mutex_down(&fed->fed_lcd_lock); + rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd, + sizeof(zero_lcd), &off, 0); + fed->fed_lcd = NULL; + cfs_mutex_up(&fed->fed_lcd_lock); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - /* Make sure the server's last_transno is up to date. Do this - * after the client is freed so we know all the client's - * transactions have been committed. */ - if (rc == 0) - filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd, need_sync); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + CDEBUG(rc == 0 ? D_INFO : D_ERROR, + "zero out client %s at idx %u/%llu in %s, rc %d\n", + lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, + LAST_RCVD, rc); - CDEBUG(rc == 0 ? D_INFO : D_ERROR, - "zero out client %s at idx %u/%llu in %s %ssync rc %d\n", - fed->fed_lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, - LAST_RCVD, need_sync ? "" : "a", rc); - } - - if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) { + if (!cfs_test_and_clear_bit(fed->fed_lr_idx, + filter->fo_last_rcvd_slots)) { CERROR("FILTER client %u: bit already clear in bitmap!!\n", fed->fed_lr_idx); LBUG(); } - - EXIT; + OBD_FREE_PTR(lcd); + RETURN(0); free: - OBD_FREE_PTR(fed->fed_lcd); + cfs_mutex_down(&fed->fed_lcd_lock); fed->fed_lcd = NULL; + cfs_mutex_up(&fed->fed_lcd_lock); + OBD_FREE_PTR(lcd); return 0; } @@ -481,7 +493,7 @@ static inline void filter_fmd_put_nolock(struct filter_export_data *fed, /* XXX when we have persistent reservations and the handle * is stored herein we need to drop it here. */ fed->fed_mod_count--; - list_del(&fmd->fmd_list); + cfs_list_del(&fmd->fmd_list); OBD_SLAB_FREE(fmd, ll_fmd_cachep, sizeof(*fmd)); } } @@ -495,9 +507,9 @@ void filter_fmd_put(struct obd_export *exp, struct filter_mod_data *fmd) return; fed = &exp->exp_filter_data; - spin_lock(&fed->fed_lock); + cfs_spin_lock(&fed->fed_lock); filter_fmd_put_nolock(fed, fmd); /* caller reference */ - spin_unlock(&fed->fed_lock); + cfs_spin_unlock(&fed->fed_lock); } /* expire entries from the end of the list if there are too many @@ -508,25 +520,25 @@ static void filter_fmd_expire_nolock(struct filter_obd *filter, { struct filter_mod_data *fmd, *tmp; - list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { + cfs_list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { if (fmd == keep) break; - if (time_before(jiffies, fmd->fmd_expire) && + if (cfs_time_before(jiffies, fmd->fmd_expire) && fed->fed_mod_count < filter->fo_fmd_max_num) break; - list_del_init(&fmd->fmd_list); + cfs_list_del_init(&fmd->fmd_list); filter_fmd_put_nolock(fed, fmd); /* list reference */ } } void filter_fmd_expire(struct obd_export *exp) { - spin_lock(&exp->exp_filter_data.fed_lock); + cfs_spin_lock(&exp->exp_filter_data.fed_lock); filter_fmd_expire_nolock(&exp->exp_obd->u.filter, &exp->exp_filter_data, NULL); - spin_unlock(&exp->exp_filter_data.fed_lock); + cfs_spin_unlock(&exp->exp_filter_data.fed_lock); } /* find specified objid, group in export fmd list. @@ -539,11 +551,11 @@ static struct filter_mod_data *filter_fmd_find_nolock(struct filter_obd *filter, LASSERT_SPIN_LOCKED(&fed->fed_lock); - list_for_each_entry_reverse(fmd, &fed->fed_mod_list, fmd_list) { + cfs_list_for_each_entry_reverse(fmd, &fed->fed_mod_list, fmd_list) { if (fmd->fmd_id == objid && fmd->fmd_gr == group) { found = fmd; - list_del(&fmd->fmd_list); - list_add_tail(&fmd->fmd_list, &fed->fed_mod_list); + cfs_list_del(&fmd->fmd_list); + cfs_list_add_tail(&fmd->fmd_list, &fed->fed_mod_list); fmd->fmd_expire = jiffies + filter->fo_fmd_max_age; break; } @@ -560,12 +572,12 @@ struct filter_mod_data *filter_fmd_find(struct obd_export *exp, { struct filter_mod_data *fmd; - spin_lock(&exp->exp_filter_data.fed_lock); + cfs_spin_lock(&exp->exp_filter_data.fed_lock); fmd = filter_fmd_find_nolock(&exp->exp_obd->u.filter, &exp->exp_filter_data, objid, group); if (fmd) fmd->fmd_refcount++; /* caller reference */ - spin_unlock(&exp->exp_filter_data.fed_lock); + cfs_spin_unlock(&exp->exp_filter_data.fed_lock); return fmd; } @@ -583,11 +595,12 @@ struct filter_mod_data *filter_fmd_get(struct obd_export *exp, OBD_SLAB_ALLOC_PTR_GFP(fmd_new, ll_fmd_cachep, CFS_ALLOC_IO); - spin_lock(&fed->fed_lock); + cfs_spin_lock(&fed->fed_lock); found = filter_fmd_find_nolock(&exp->exp_obd->u.filter,fed,objid,group); if (fmd_new) { if (found == NULL) { - list_add_tail(&fmd_new->fmd_list, &fed->fed_mod_list); + cfs_list_add_tail(&fmd_new->fmd_list, + &fed->fed_mod_list); fmd_new->fmd_id = objid; fmd_new->fmd_gr = group; fmd_new->fmd_refcount++; /* list reference */ @@ -603,7 +616,7 @@ struct filter_mod_data *filter_fmd_get(struct obd_export *exp, exp->exp_obd->u.filter.fo_fmd_max_age; } - spin_unlock(&fed->fed_lock); + cfs_spin_unlock(&fed->fed_lock); return found; } @@ -617,13 +630,13 @@ static void filter_fmd_drop(struct obd_export *exp, obd_id objid, obd_gr group) { struct filter_mod_data *found = NULL; - spin_lock(&exp->exp_filter_data.fed_lock); + cfs_spin_lock(&exp->exp_filter_data.fed_lock); found = filter_fmd_find_nolock(&exp->exp_filter_data, objid, group); if (found) { - list_del_init(&found->fmd_list); + cfs_list_del_init(&found->fmd_list); filter_fmd_put_nolock(&exp->exp_filter_data, found); } - spin_unlock(&exp->exp_filter_data.fed_lock); + cfs_spin_unlock(&exp->exp_filter_data.fed_lock); } #else #define filter_fmd_drop(exp, objid, group) @@ -635,22 +648,22 @@ static void filter_fmd_cleanup(struct obd_export *exp) struct filter_export_data *fed = &exp->exp_filter_data; struct filter_mod_data *fmd = NULL, *tmp; - spin_lock(&fed->fed_lock); - list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { - list_del_init(&fmd->fmd_list); + cfs_spin_lock(&fed->fed_lock); + cfs_list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { + cfs_list_del_init(&fmd->fmd_list); filter_fmd_put_nolock(fed, fmd); } - spin_unlock(&fed->fed_lock); + cfs_spin_unlock(&fed->fed_lock); } static int filter_init_export(struct obd_export *exp) { - spin_lock_init(&exp->exp_filter_data.fed_lock); + cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - spin_lock(&exp->exp_lock); + cfs_spin_lock(&exp->exp_lock); exp->exp_connecting = 1; - spin_unlock(&exp->exp_lock); + cfs_spin_unlock(&exp->exp_lock); return ldlm_init_export(exp); } @@ -666,7 +679,7 @@ static int filter_free_server_data(struct filter_obd *filter) /* assumes caller is already in kernel ctxt */ int filter_update_server_data(struct obd_device *obd, struct file *filp, - struct lr_server_data *fsd, int force_sync) + struct lr_server_data *fsd) { loff_t off = 0; int rc; @@ -678,7 +691,7 @@ int filter_update_server_data(struct obd_device *obd, struct file *filp, CDEBUG(D_INODE, "server last_mount: "LPU64"\n", le64_to_cpu(fsd->lsd_mount_count)); - rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off, force_sync); + rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off, 0); if (rc) CERROR("error writing lr_server_data: rc = %d\n", rc); @@ -856,6 +869,8 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) continue; } + check_lcd(obd->obd_name, cl_idx, lcd); + last_rcvd = le64_to_cpu(lcd->lcd_last_transno); /* These exports are cleaned up by filter_disconnect(), so they @@ -886,13 +901,13 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) /* VBR: set export last committed */ exp->exp_last_committed = last_rcvd; - spin_lock(&exp->exp_lock); + cfs_spin_lock(&exp->exp_lock); exp->exp_connecting = 0; exp->exp_in_recovery = 0; - spin_unlock(&exp->exp_lock); - spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&exp->exp_lock); + cfs_spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_max_recoverable_clients++; - spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock_bh(&obd->obd_processing_task_lock); lcd = NULL; class_export_put(exp); } @@ -914,7 +929,7 @@ out: fsd->lsd_mount_count = cpu_to_le64(filter->fo_mount_count); /* save it, so mount count and last_transno is current */ - rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1); + rc = filter_update_server_data(obd, filp, filter->fo_fsd); if (rc) GOTO(err_client, rc); @@ -1007,8 +1022,6 @@ static int filter_update_last_group(struct obd_device *obd, int group) CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc); GOTO(cleanup, rc); } - LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group), - "off = %llu and last_group = %d\n", off, last_group); CDEBUG(D_INODE, "%s: previous %d, new %d\n", obd->obd_name, last_group, group); @@ -1082,7 +1095,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, GOTO(cleanup, rc); } - if (filter->fo_subdir_count) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs)); if (tmp_subdirs == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -1146,7 +1159,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, filter->fo_dentry_O_groups[group] = dentry; filter->fo_last_objid_files[group] = filp; - if (filter->fo_subdir_count) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { filter->fo_dentry_O_sub[group] = *tmp_subdirs; OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs)); } @@ -1175,7 +1188,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, if (new_files != NULL) OBD_FREE(new_files, len * sizeof(*new_files)); case 3: - if (filter->fo_subdir_count) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { for (i = 0; i < filter->fo_subdir_count; i++) { if (tmp_subdirs->dentry[i] != NULL) dput(tmp_subdirs->dentry[i]); @@ -1196,15 +1209,14 @@ static int filter_read_groups(struct obd_device *obd, int last_group, struct filter_obd *filter = &obd->u.filter; int old_count, group, rc = 0; - down(&filter->fo_init_lock); + cfs_down(&filter->fo_init_lock); old_count = filter->fo_group_count; for (group = old_count; group <= last_group; group++) { - rc = filter_read_group_internal(obd, group, create); if (rc != 0) break; } - up(&filter->fo_init_lock); + cfs_up(&filter->fo_init_lock); return rc; } @@ -1212,7 +1224,7 @@ static int filter_read_groups(struct obd_device *obd, int last_group, static int filter_prep_groups(struct obd_device *obd) { struct filter_obd *filter = &obd->u.filter; - struct dentry *dentry, *O_dentry; + struct dentry *O_dentry; struct file *filp; int last_group, rc = 0, cleanup_phase = 0; loff_t off = 0; @@ -1229,57 +1241,6 @@ static int filter_prep_groups(struct obd_device *obd) filter->fo_dentry_O = O_dentry; cleanup_phase = 1; /* O_dentry */ - /* Lookup "R" to tell if we're on an old OST FS and need to convert - * from O/R// to O/0//. This can be removed - * some time post 1.0 when all old-style OSTs have converted along - * with the init_objid hack. */ - dentry = ll_lookup_one_len("R", O_dentry, 1); - if (IS_ERR(dentry)) - GOTO(cleanup, rc = PTR_ERR(dentry)); - if (dentry->d_inode && S_ISDIR(dentry->d_inode->i_mode)) { - struct dentry *O0_dentry = lookup_one_len("0", O_dentry, 1); - ENTRY; - - CWARN("converting OST to new object layout\n"); - if (IS_ERR(O0_dentry)) { - rc = PTR_ERR(O0_dentry); - CERROR("error looking up O/0: rc %d\n", rc); - GOTO(cleanup_R, rc); - } - - if (O0_dentry->d_inode) { - CERROR("Both O/R and O/0 exist. Fix manually.\n"); - GOTO(cleanup_O0, rc = -EEXIST); - } - - LOCK_INODE_MUTEX(O_dentry->d_inode); - rc = ll_vfs_rename(O_dentry->d_inode, dentry, filter->fo_vfsmnt, - O_dentry->d_inode, O0_dentry, - filter->fo_vfsmnt); - UNLOCK_INODE_MUTEX(O_dentry->d_inode); - - if (rc) { - CERROR("error renaming O/R to O/0: rc %d\n", rc); - GOTO(cleanup_O0, rc); - } - filter->fo_fsd->lsd_feature_incompat |= - cpu_to_le32(OBD_INCOMPAT_GROUPS); - rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd, 1); - GOTO(cleanup_O0, rc); - - cleanup_O0: - f_dput(O0_dentry); - cleanup_R: - f_dput(dentry); - if (rc) - GOTO(cleanup, rc); - } else { - f_dput(dentry); - } - - cleanup_phase = 2; /* groups */ - /* we have to initialize all groups before first connections from * clients because they may send create/destroy for any group -bzzz */ filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700); @@ -1287,21 +1248,19 @@ static int filter_prep_groups(struct obd_device *obd) CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp)); GOTO(cleanup, rc = PTR_ERR(filp)); } - cleanup_phase = 3; /* filp */ + cleanup_phase = 2; /* filp */ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off); if (rc) { CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc); GOTO(cleanup, rc); } - if (off == 0) { - last_group = FILTER_MIN_GROUPS; - } else { - LASSERT_MDS_GROUP(last_group); - } + + if (off == 0) + last_group = FILTER_GROUP_MDS0; CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, - FILTER_MIN_GROUPS, last_group); + FILTER_GROUP_MDS0, last_group); filter->fo_committed_group = last_group; rc = filter_read_groups(obd, last_group, 1); if (rc) @@ -1312,11 +1271,10 @@ static int filter_prep_groups(struct obd_device *obd) cleanup: switch (cleanup_phase) { - case 3: - filp_close(filp, 0); case 2: - filter_cleanup_groups(obd); + filp_close(filp, 0); case 1: + filter_cleanup_groups(obd); f_dput(filter->fo_dentry_O); filter->fo_dentry_O = NULL; default: @@ -1378,7 +1336,7 @@ static int filter_prep(struct obd_device *obd) HEALTH_CHECK, rc); GOTO(err_server_data, rc); } - filter->fo_health_check_filp = file; + filter->fo_obt.obt_health_check_filp = file; if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK, file->f_dentry->d_inode->i_mode); @@ -1397,9 +1355,9 @@ out: return(rc); err_health_check: - if (filp_close(filter->fo_health_check_filp, 0)) + if (filp_close(filter->fo_obt.obt_health_check_filp, 0)) CERROR("can't close %s after error\n", HEALTH_CHECK); - filter->fo_health_check_filp = NULL; + filter->fo_obt.obt_health_check_filp = NULL; err_server_data: target_recovery_fini(obd); filter_free_server_data(filter); @@ -1423,7 +1381,7 @@ static void filter_post(struct obd_device *obd) push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = filter_update_server_data(obd, filter->fo_rcvd_filp, - filter->fo_fsd, 0); + filter->fo_fsd); if (rc) CERROR("error writing server data: rc = %d\n", rc); @@ -1440,8 +1398,8 @@ static void filter_post(struct obd_device *obd) if (rc) CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc); - rc = filp_close(filter->fo_health_check_filp, 0); - filter->fo_health_check_filp = NULL; + rc = filp_close(filter->fo_obt.obt_health_check_filp, 0); + filter->fo_obt.obt_health_check_filp = NULL; if (rc) CERROR("error closing %s: rc = %d\n", HEALTH_CHECK, rc); @@ -1459,9 +1417,9 @@ static void filter_set_last_id(struct filter_obd *filter, LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); - spin_lock(&filter->fo_objidlock); + cfs_spin_lock(&filter->fo_objidlock); filter->fo_last_objids[group] = id; - spin_unlock(&filter->fo_objidlock); + cfs_spin_unlock(&filter->fo_objidlock); } obd_id filter_last_id(struct filter_obd *filter, obd_gr group) @@ -1469,11 +1427,12 @@ obd_id filter_last_id(struct filter_obd *filter, obd_gr group) obd_id id; LASSERT(filter->fo_fsd != NULL); LASSERT(group <= filter->fo_group_count); + LASSERT(filter->fo_last_objids != NULL); /* FIXME: object groups */ - spin_lock(&filter->fo_objidlock); + cfs_spin_lock(&filter->fo_objidlock); id = filter->fo_last_objids[group]; - spin_unlock(&filter->fo_objidlock); + cfs_spin_unlock(&filter->fo_objidlock); return id; } @@ -1490,8 +1449,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) struct filter_subdirs *subdirs; LASSERT(group < filter->fo_group_count); /* FIXME: object groups */ - if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) || - filter->fo_subdir_count == 0) + if (!filter_group_is_mds(group) || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; subdirs = &filter->fo_dentry_O_sub[group]; @@ -1601,7 +1559,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, - NULL, lockh); + lockh); if (rc != ELDLM_OK) lockh->cookie = 0; RETURN(rc); @@ -1610,7 +1568,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, static void filter_fini_destroy(struct obd_device *obd, struct lustre_handle *lockh) { - if (lockh->cookie) + if (lustre_handle_is_used(lockh)) ldlm_lock_decref(lockh, LCK_PW); } @@ -1713,7 +1671,7 @@ static enum interval_iter filter_intent_cb(struct interval_node *n, if (interval_high(n) <= size) return INTERVAL_ITER_STOP; - list_for_each_entry(lck, &node->li_group, l_sl_policy) { + cfs_list_for_each_entry(lck, &node->li_group, l_sl_policy) { /* Don't send glimpse ASTs to liblustre clients. * They aren't listening for them, and they do * entirely synchronous I/O anyways. */ @@ -1790,13 +1748,14 @@ static int filter_intent_policy(struct ldlm_namespace *ns, /* FIXME: we should change the policy function slightly, to not make * this list at all, since we just turn around and free it */ - while (!list_empty(&rpc_list)) { + while (!cfs_list_empty(&rpc_list)) { struct ldlm_lock *wlock = - list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast); + cfs_list_entry(rpc_list.next, struct ldlm_lock, + l_cp_ast); LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); LASSERT(lock->l_flags & LDLM_FL_CP_REQD); lock->l_flags &= ~LDLM_FL_CP_REQD; - list_del_init(&wlock->l_cp_ast); + cfs_list_del_init(&wlock->l_cp_ast); LDLM_LOCK_RELEASE(wlock); } @@ -1859,7 +1818,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); } RETURN(ELDLM_LOCK_ABORTED); } @@ -1886,7 +1845,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * sending ast is not handled. This can result in lost client writes. */ if (rc != 0) - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); lock_res(res); *reply_lvb = *res_lvb; @@ -1965,10 +1924,10 @@ static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial) sptlrpc_target_update_exp_flavor(obd, &tmp_rset); - write_lock(&filter->fo_sptlrpc_lock); + cfs_write_lock(&filter->fo_sptlrpc_lock); sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset); filter->fo_sptlrpc_rset = tmp_rset; - write_unlock(&filter->fo_sptlrpc_lock); + cfs_write_unlock(&filter->fo_sptlrpc_lock); return 0; } @@ -2103,16 +2062,16 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, obd->obd_lvfs_ctxt.fs = get_ds(); obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops; - init_mutex(&filter->fo_init_lock); + cfs_init_mutex(&filter->fo_init_lock); filter->fo_committed_group = 0; filter->fo_destroys_in_progress = 0; for (i = 0; i < 32; i++) - sema_init(&filter->fo_create_locks[i], 1); + cfs_sema_init(&filter->fo_create_locks[i], 1); - spin_lock_init(&filter->fo_translock); - spin_lock_init(&filter->fo_objidlock); + cfs_spin_lock_init(&filter->fo_translock); + cfs_spin_lock_init(&filter->fo_objidlock); CFS_INIT_LIST_HEAD(&filter->fo_export_list); - sema_init(&filter->fo_alloc_lock, 1); + cfs_sema_init(&filter->fo_alloc_lock, 1); init_brw_stats(&filter->fo_filter_stats); filter->fo_read_cache = 1; /* enable read-only cache by default */ filter->fo_writethrough_cache = 1; /* enable writethrough cache */ @@ -2125,7 +2084,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, GOTO(err_ops, rc); CFS_INIT_LIST_HEAD(&filter->fo_llog_list); - spin_lock_init(&filter->fo_llog_list_lock); + cfs_spin_lock_init(&filter->fo_llog_list_lock); filter->fo_fl_oss_capa = 1; @@ -2152,7 +2111,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, GOTO(err_post, rc); } - rwlock_init(&filter->fo_sptlrpc_lock); + cfs_rwlock_init(&filter->fo_sptlrpc_lock); sptlrpc_rule_set_init(&filter->fo_sptlrpc_rset); /* do this after llog being initialized */ filter_adapt_sptlrpc_conf(obd, 1); @@ -2447,20 +2406,20 @@ static int filter_llog_finish(struct obd_device *obd, int count) * This is safe to do, as llog is already synchronized * and its import may go. */ - mutex_down(&ctxt->loc_sem); + cfs_mutex_down(&ctxt->loc_sem); if (ctxt->loc_imp) { class_import_put(ctxt->loc_imp); ctxt->loc_imp = NULL; } - mutex_up(&ctxt->loc_sem); + cfs_mutex_up(&ctxt->loc_sem); llog_ctxt_put(ctxt); } if (filter->fo_lcm) { - mutex_down(&ctxt->loc_sem); + cfs_mutex_down(&ctxt->loc_sem); llog_recov_thread_fini(filter->fo_lcm, obd->obd_force); filter->fo_lcm = NULL; - mutex_up(&ctxt->loc_sem); + cfs_mutex_up(&ctxt->loc_sem); } RETURN(filter_olg_fini(&obd->obd_olg)); } @@ -2474,7 +2433,7 @@ filter_find_olg_internal(struct filter_obd *filter, int group) struct obd_llog_group *olg; LASSERT_SPIN_LOCKED(&filter->fo_llog_list_lock); - list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { + cfs_list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { if (olg->olg_group == group) RETURN(olg); } @@ -2494,9 +2453,9 @@ struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group) if (group == FILTER_GROUP_LLOG) RETURN(&obd->obd_olg); - spin_lock(&filter->fo_llog_list_lock); + cfs_spin_lock(&filter->fo_llog_list_lock); olg = filter_find_olg_internal(filter, group); - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); RETURN(olg); } @@ -2516,7 +2475,7 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) if (group == FILTER_GROUP_LLOG) RETURN(&obd->obd_olg); - spin_lock(&filter->fo_llog_list_lock); + cfs_spin_lock(&filter->fo_llog_list_lock); olg = filter_find_olg_internal(filter, group); if (olg) { if (olg->olg_initializing) { @@ -2530,28 +2489,28 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) GOTO(out_unlock, olg = ERR_PTR(-ENOMEM)); llog_group_init(olg, group); - list_add(&olg->olg_list, &filter->fo_llog_list); + cfs_list_add(&olg->olg_list, &filter->fo_llog_list); olg->olg_initializing = 1; - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); rc = obd_llog_init(obd, olg, obd, NULL); if (rc) { - spin_lock(&filter->fo_llog_list_lock); - list_del(&olg->olg_list); - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_lock(&filter->fo_llog_list_lock); + cfs_list_del(&olg->olg_list); + cfs_spin_unlock(&filter->fo_llog_list_lock); OBD_FREE_PTR(olg); GOTO(out, olg = ERR_PTR(-ENOMEM)); } - spin_lock(&filter->fo_llog_list_lock); + cfs_spin_lock(&filter->fo_llog_list_lock); olg->olg_initializing = 0; - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n", obd->obd_name, group, olg); out: RETURN(olg); out_unlock: - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); GOTO(out, olg); } @@ -2584,9 +2543,9 @@ static int filter_llog_connect(struct obd_export *exp, obd->obd_name, body->lgdc_logid.lgl_oid, body->lgdc_logid.lgl_ogr, body->lgdc_logid.lgl_ogen); - spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock_bh(&obd->obd_processing_task_lock); obd->u.filter.fo_mds_ost_sync = 1; - spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock_bh(&obd->obd_processing_task_lock); rc = llog_connect(ctxt, &body->lgdc_logid, &body->lgdc_gen, NULL); llog_ctxt_put(ctxt); @@ -2601,7 +2560,7 @@ static int filter_llog_preclean(struct obd_device *obd) { struct obd_llog_group *olg, *tmp; struct filter_obd *filter; - struct list_head remove_list; + cfs_list_t remove_list; int rc = 0; ENTRY; @@ -2612,17 +2571,17 @@ static int filter_llog_preclean(struct obd_device *obd) filter = &obd->u.filter; CFS_INIT_LIST_HEAD(&remove_list); - spin_lock(&filter->fo_llog_list_lock); - while (!list_empty(&filter->fo_llog_list)) { - olg = list_entry(filter->fo_llog_list.next, - struct obd_llog_group, olg_list); - list_del(&olg->olg_list); - list_add(&olg->olg_list, &remove_list); + cfs_spin_lock(&filter->fo_llog_list_lock); + while (!cfs_list_empty(&filter->fo_llog_list)) { + olg = cfs_list_entry(filter->fo_llog_list.next, + struct obd_llog_group, olg_list); + cfs_list_del(&olg->olg_list); + cfs_list_add(&olg->olg_list, &remove_list); } - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); - list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) { - list_del_init(&olg->olg_list); + cfs_list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) { + cfs_list_del_init(&olg->olg_list); rc = filter_olg_fini(olg); if (rc) CERROR("failed to cleanup llogging subsystem for %u\n", @@ -2695,7 +2654,8 @@ static int filter_cleanup(struct obd_device *obd) } static int filter_connect_internal(struct obd_export *exp, - struct obd_connect_data *data) + struct obd_connect_data *data, + int reconnect) { struct filter_export_data *fed = &exp->exp_filter_data; @@ -2731,18 +2691,18 @@ static int filter_connect_internal(struct obd_export *exp, struct filter_obd *filter = &exp->exp_obd->u.filter; obd_size left, want; - spin_lock(&exp->exp_obd->obd_osfs_lock); + cfs_spin_lock(&exp->exp_obd->obd_osfs_lock); left = filter_grant_space_left(exp); want = data->ocd_grant; - filter_grant(exp, fed->fed_grant, want, left); + filter_grant(exp, fed->fed_grant, want, left, (reconnect == 0)); data->ocd_grant = fed->fed_grant; - spin_unlock(&exp->exp_obd->obd_osfs_lock); + cfs_spin_unlock(&exp->exp_obd->obd_osfs_lock); CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: " LPU64" left: "LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_grant, want, left); - + filter->fo_tot_granted_clients ++; } @@ -2756,8 +2716,10 @@ static int filter_connect_internal(struct obd_export *exp, /* this will only happen on the first connect */ lsd->lsd_ost_index = cpu_to_le32(data->ocd_index); lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_OST); + /* sync is not needed here as filter_client_add will + * set exp_need_sync flag */ filter_update_server_data(exp->exp_obd, - filter->fo_rcvd_filp, lsd, 1); + filter->fo_rcvd_filp, lsd); } else if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" @@ -2820,7 +2782,7 @@ static int filter_reconnect(const struct lu_env *env, if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); - rc = filter_connect_internal(exp, data); + rc = filter_connect_internal(exp, data, 1); if (rc == 0) filter_export_stats_init(obd, exp, localdata); @@ -2853,7 +2815,7 @@ static int filter_connect(const struct lu_env *env, fed = &lexp->exp_filter_data; - rc = filter_connect_internal(lexp, data); + rc = filter_connect_internal(lexp, data, 0); if (rc) GOTO(cleanup, rc); @@ -2894,6 +2856,7 @@ cleanup: fed->fed_lcd = NULL; } class_disconnect(lexp); + lprocfs_exp_cleanup(lexp); *exp = NULL; } else { *exp = lexp; @@ -2913,7 +2876,7 @@ static void filter_grant_sanity_check(struct obd_device *obd, const char *func) obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0; obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted; - if (list_empty(&obd->obd_exports)) + if (cfs_list_empty(&obd->obd_exports)) return; /* We don't want to do this for large machines that do lots of @@ -2921,9 +2884,9 @@ static void filter_grant_sanity_check(struct obd_device *obd, const char *func) if (obd->obd_num_exports > 100) return; - spin_lock(&obd->obd_osfs_lock); - spin_lock(&obd->obd_dev_lock); - list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) { + cfs_spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->obd_dev_lock); + cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) { int error = 0; fed = &exp->exp_filter_data; if (fed->fed_grant < 0 || fed->fed_pending < 0 || @@ -2954,8 +2917,8 @@ static void filter_grant_sanity_check(struct obd_device *obd, const char *func) fo_tot_granted = obd->u.filter.fo_tot_granted; fo_tot_pending = obd->u.filter.fo_tot_pending; fo_tot_dirty = obd->u.filter.fo_tot_dirty; - spin_unlock(&obd->obd_dev_lock); - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_dev_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); /* Do these assertions outside the spinlocks so we don't kill system */ if (tot_granted != fo_tot_granted) @@ -2988,11 +2951,7 @@ static void filter_grant_discard(struct obd_export *exp) struct filter_obd *filter = &obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; - spin_lock(&obd->obd_osfs_lock); - spin_lock(&obd->obd_dev_lock); - list_del_init(&exp->exp_obd_chain); - spin_unlock(&obd->obd_dev_lock); - + cfs_spin_lock(&obd->obd_osfs_lock); LASSERTF(filter->fo_tot_granted >= fed->fed_grant, "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n", obd->obd_name, filter->fo_tot_granted, @@ -3011,7 +2970,7 @@ static void filter_grant_discard(struct obd_export *exp) fed->fed_dirty = 0; fed->fed_grant = 0; - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); } static int filter_destroy_export(struct obd_export *exp) @@ -3032,9 +2991,7 @@ static int filter_destroy_export(struct obd_export *exp) RETURN(0); - if (exp->exp_obd->obd_replayable) - filter_client_free(exp); - else + if (!exp->exp_obd->obd_replayable) fsfilt_sync(exp->exp_obd, exp->exp_obd->u.obt.obt_sb); filter_grant_discard(exp); @@ -3069,8 +3026,8 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) /* look for group with min. number, but > worked */ olg_min = NULL; group = 1 << 30; - spin_lock(&filter->fo_llog_list_lock); - list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { + cfs_spin_lock(&filter->fo_llog_list_lock); + cfs_list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { if (olg->olg_group <= worked) { /* this group is already synced */ continue; @@ -3083,7 +3040,7 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) olg_min = olg; group = olg->olg_group; } - spin_unlock(&filter->fo_llog_list_lock); + cfs_spin_unlock(&filter->fo_llog_list_lock); if (olg_min == NULL) break; @@ -3125,14 +3082,12 @@ static int filter_disconnect(struct obd_export *exp) lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); - /* Disconnect early so that clients can't keep using export */ - rc = class_disconnect(exp); - if (exp->exp_obd->obd_namespace != NULL) - ldlm_cancel_locks_for_export(exp); + rc = server_disconnect_export(exp); - fsfilt_sync(obd, obd->u.obt.obt_sb); - - lprocfs_exp_cleanup(exp); + if (exp->exp_obd->obd_replayable) + filter_client_free(exp); + else + fsfilt_sync(obd, obd->u.obt.obt_sb); class_export_put(exp); RETURN(rc); @@ -3193,7 +3148,8 @@ static int filter_getattr(struct obd_export *exp, struct obd_info *oinfo) int rc = 0; ENTRY; - rc = filter_auth_capa(exp, NULL, oinfo_mdsno(oinfo), + LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_gr, oinfo_capa(oinfo), CAPA_OPC_META_READ); if (rc) RETURN(rc); @@ -3264,6 +3220,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, loff_t old_size = 0; unsigned int ia_valid; struct inode *inode; + struct page *page = NULL; struct iattr iattr; void *handle; ENTRY; @@ -3299,6 +3256,17 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, if (rc) GOTO(out_unlock, rc); + /* Let's pin the last page so that ldiskfs_truncate + * should not start GFP_FS allocation. */ + if (ia_valid & ATTR_SIZE) { + page = grab_cache_page(inode->i_mapping, + iattr.ia_size >> PAGE_CACHE_SHIFT); + if (page == NULL) + GOTO(out_unlock, rc = -ENOMEM); + + unlock_page(page); + } + /* If the inode still has SUID+SGID bits set (see filter_precreate()) * then we will accept the UID+GID sent by the client during write for * initializing the ownership of this inode. We only allow this to @@ -3343,8 +3311,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, GOTO(out_unlock, rc = PTR_ERR(handle)); } if (oa->o_valid & OBD_MD_FLFLAGS) { - rc = fsfilt_iocontrol(exp->exp_obd, inode, NULL, - EXT3_IOC_SETFLAGS, (long)&oa->o_flags); + rc = fsfilt_iocontrol(exp->exp_obd, dentry, + FSFILT_IOC_SETFLAGS, (long)&oa->o_flags); } else { rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1); if (fcc != NULL) @@ -3387,7 +3355,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, } EXIT; + out_unlock: + if (page) + page_cache_release(page); + if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) UNLOCK_INODE_MUTEX(inode); if (ia_valid & ATTR_SIZE) @@ -3425,12 +3397,14 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, if (oa->o_valid & OBD_FL_TRUNC) opc |= CAPA_OPC_OSS_TRUNC; - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, opc); + + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, opc); if (rc) RETURN(rc); if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID)) { - rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa); + rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa); if (rc) RETURN(rc); } @@ -3462,7 +3436,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, filter = &exp->exp_obd->u.filter; push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - lock_kernel(); if (oa->o_valid & (OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME)) { @@ -3482,7 +3455,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, if (res != NULL) { LDLM_RESOURCE_ADDREF(res); - rc = ldlm_res_lvbo_update(res, NULL, 0, 0); + rc = ldlm_res_lvbo_update(res, NULL, 0); LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); } @@ -3495,7 +3468,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, EXIT; out_unlock: - unlock_kernel(); f_dput(dentry); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); return rc; @@ -3578,7 +3550,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, doa.o_gr = oa->o_gr; doa.o_mode = S_IFREG; - if (!test_bit(doa.o_gr, &filter->fo_destroys_in_progress)) { + if (!cfs_test_bit(doa.o_gr, &filter->fo_destroys_in_progress)) { CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", exp->exp_obd->obd_name, doa.o_gr); RETURN(0); @@ -3618,7 +3590,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, oa->o_id = last; rc = 0; } - clear_bit(doa.o_gr, &filter->fo_destroys_in_progress); + cfs_clear_bit(doa.o_gr, &filter->fo_destroys_in_progress); RETURN(rc); } @@ -3644,12 +3616,12 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, RETURN(0); } /* This causes inflight precreates to abort and drop lock */ - set_bit(group, &filter->fo_destroys_in_progress); - down(&filter->fo_create_locks[group]); - if (!test_bit(group, &filter->fo_destroys_in_progress)) { + cfs_set_bit(group, &filter->fo_destroys_in_progress); + cfs_down(&filter->fo_create_locks[group]); + if (!cfs_test_bit(group, &filter->fo_destroys_in_progress)) { CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", exp->exp_obd->obd_name, group); - up(&filter->fo_create_locks[group]); + cfs_up(&filter->fo_create_locks[group]); RETURN(0); } diff = oa->o_id - last; @@ -3671,17 +3643,17 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, GOTO(out, rc); } else { /* XXX: Used by MDS for the first time! */ - clear_bit(group, &filter->fo_destroys_in_progress); + cfs_clear_bit(group, &filter->fo_destroys_in_progress); } } else { - down(&filter->fo_create_locks[group]); + cfs_down(&filter->fo_create_locks[group]); if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old precreate request\n", obd->obd_name); GOTO(out, rc = 0); } /* only precreate if group == 0 and o_id is specfied */ - if (group == FILTER_GROUP_LLOG || oa->o_id == 0) + if (!filter_group_is_mds(group) || oa->o_id == 0) diff = 1; else diff = oa->o_id - filter_last_id(filter, group); @@ -3703,7 +3675,7 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, /* else diff == 0 */ GOTO(out, rc = 0); out: - up(&filter->fo_create_locks[group]); + cfs_up(&filter->fo_create_locks[group]); return rc; } @@ -3718,10 +3690,10 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, /* at least try to account for cached pages. its still racey and * might be under-reporting if clients haven't announced their * caches with brw recently */ - spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->obd_osfs_lock); rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age); memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64 " pending "LPU64" free "LPU64" avail "LPU64"\n", @@ -3735,6 +3707,17 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, ((filter->fo_tot_dirty + filter->fo_tot_pending + osfs->os_bsize - 1) >> blockbits)); + if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) { + struct lr_server_data *lsd = filter->fo_fsd; + int index = le32_to_cpu(lsd->lsd_ost_index); + + if (obd_fail_val == -1 || + index == obd_fail_val) + osfs->os_bfree = osfs->os_bavail = 2; + else if (obd_fail_loc & OBD_FAIL_ONCE) + obd_fail_loc &= ~OBD_FAILED; /* reset flag */ + } + /* set EROFS to state field if FS is mounted as RDONLY. The goal is to * stop creating files on MDS if OST is not good shape to create * objects.*/ @@ -3777,11 +3760,11 @@ static __u64 filter_calc_free_inodes(struct obd_device *obd) int rc; __u64 os_ffree = -1; - spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->obd_osfs_lock); rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, cfs_time_shift_64(1)); if (rc == 0) os_ffree = obd->obd_osfs.os_ffree; - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); return os_ffree; } @@ -3823,7 +3806,8 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, OBD_ALLOC(osfs, sizeof(*osfs)); if (osfs == NULL) RETURN(-ENOMEM); - rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ, 0); + rc = filter_statfs(obd, osfs, cfs_time_current_64() - CFS_HZ, + 0); if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { CDEBUG(D_RPCTRACE,"%s: not enough space for create " LPU64"\n", obd->obd_name, osfs->os_bavail << @@ -3842,7 +3826,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, for (i = 0; i < *num && err == 0; i++) { int cleanup_phase = 0; - if (test_bit(group, &filter->fo_destroys_in_progress)) { + if (cfs_test_bit(group, &filter->fo_destroys_in_progress)) { CWARN("%s: create aborted by destroy\n", obd->obd_name); rc = -EAGAIN; @@ -3959,7 +3943,7 @@ set_last_id: if (rc) break; - if (time_after(jiffies, enough_time)) { + if (cfs_time_after(jiffies, enough_time)) { CDEBUG(D_RPCTRACE, "%s: precreate slow - want %d got %d \n", obd->obd_name, *num, i); @@ -4028,9 +4012,9 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, rc = -EINVAL; } else { diff = 1; - down(&filter->fo_create_locks[oa->o_gr]); + cfs_down(&filter->fo_create_locks[oa->o_gr]); rc = filter_precreate(obd, oa, oa->o_gr, &diff); - up(&filter->fo_create_locks[oa->o_gr]); + cfs_up(&filter->fo_create_locks[oa->o_gr]); } } else { rc = filter_handle_precreate(exp, oa, oa->o_gr, oti); @@ -4068,8 +4052,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, ENTRY; LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), + rc = filter_auth_capa(exp, NULL, oa->o_gr, (struct lustre_capa *)capa, CAPA_OPC_OSS_DESTROY); if (rc) RETURN(rc); @@ -4095,15 +4078,14 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, if (oa->o_valid & OBD_MD_FLCOOKIE) { struct llog_ctxt *ctxt; struct obd_llog_group *olg; - fcc = &oa->o_lcookie; + olg = filter_find_olg(obd, oa->o_gr); if (!olg) { CERROR(" %s: can not find olg of group %d\n", obd->obd_name, (int)oa->o_gr); GOTO(cleanup, rc = PTR_ERR(olg)); } - llog_group_set_export(olg, exp); - + fcc = &oa->o_lcookie; ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1); llog_cancel(ctxt, NULL, 1, fcc, 0); llog_ctxt_put(ctxt); @@ -4112,7 +4094,9 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, GOTO(cleanup, rc = -ENOENT); } - filter_prepare_destroy(obd, oa->o_id, oa->o_gr, &lockh); + rc = filter_prepare_destroy(obd, oa->o_id, oa->o_gr, &lockh); + if (rc) + GOTO(cleanup, rc); /* Our MDC connection is established by the MDS to us */ if (oa->o_valid & OBD_MD_FLCOOKIE) { @@ -4128,18 +4112,28 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, * down(i_zombie) down(i_zombie) * restart transaction * (see BUG 4180) -bzzz + * + * take i_alloc_sem too to prevent other threads from writing to the + * file while we are truncating it. This can cause lock ordering issue + * between page lock, i_mutex & starting new journal handle. + * (see bug 20321) -johann */ + down_write(&dchild->d_inode->i_alloc_sem); LOCK_INODE_MUTEX(dchild->d_inode); /* VBR: version recovery check */ rc = filter_version_get_check(exp, oti, dchild->d_inode); - if (rc) + if (rc) { + UNLOCK_INODE_MUTEX(dchild->d_inode); + up_write(&dchild->d_inode->i_alloc_sem); GOTO(cleanup, rc); + } handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR, NULL, 1); if (IS_ERR(handle)) { UNLOCK_INODE_MUTEX(dchild->d_inode); + up_write(&dchild->d_inode->i_alloc_sem); GOTO(cleanup, rc = PTR_ERR(handle)); } @@ -4148,6 +4142,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, rc = fsfilt_setattr(obd, dchild, handle, &iattr, 1); rc2 = fsfilt_commit(obd, dchild->d_inode, handle, 0); UNLOCK_INODE_MUTEX(dchild->d_inode); + up_write(&dchild->d_inode->i_alloc_sem); if (rc) GOTO(cleanup, rc); if (rc2) @@ -4266,7 +4261,8 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, int rc, rc2; ENTRY; - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE); if (rc) RETURN(rc); @@ -4380,8 +4376,8 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen, memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap)); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_iocontrol(obd, dentry->d_inode, NULL, - EXT3_IOC_FIEMAP, (long)fiemap); + rc = fsfilt_iocontrol(obd, dentry, FSFILT_IOC_FIEMAP, + (long)fiemap); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); f_dput(dentry); @@ -4418,9 +4414,9 @@ static int filter_set_grant_shrink(struct obd_export *exp, struct ost_body *body) { /* handle shrink grant */ - spin_lock(&exp->exp_obd->obd_osfs_lock); + cfs_spin_lock(&exp->exp_obd->obd_osfs_lock); filter_grant_incoming(exp, &body->oa); - spin_unlock(&exp->exp_obd->obd_osfs_lock); + cfs_spin_unlock(&exp->exp_obd->obd_osfs_lock); RETURN(0); @@ -4587,8 +4583,8 @@ static int filter_health_check(struct obd_device *obd) rc = 1; #ifdef USE_HEALTH_CHECK_WRITE - LASSERT(filter->fo_health_check_filp != NULL); - rc |= !!lvfs_check_io_health(obd, filter->fo_health_check_filp); + LASSERT(filter->fo_obt.obt_health_check_filp != NULL); + rc |= !!lvfs_check_io_health(obd, filter->fo_obt.obt_health_check_filp); #endif return rc; } @@ -4662,11 +4658,15 @@ extern quota_interface_t filter_quota_interface; static int __init obdfilter_init(void) { struct lprocfs_static_vars lvars; - int rc; + int rc, i; + + /** sanity check for group<->mdsno conversion */ + for (i = 0; i < 32; i++) + LASSERT(objgrp_to_mdsno(mdt_to_obd_objgrp(i)) == i); lprocfs_filter_init_vars(&lvars); - request_module("lquota"); + cfs_request_module("%s", "lquota"); OBD_ALLOC(obdfilter_created_scratchpad, OBDFILTER_CREATED_SCRATCHPAD_ENTRIES * sizeof(*obdfilter_created_scratchpad));