X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=f00b738061b7758adee815d674c45b5c966bc26b;hb=c1f6b32958c799412c830f35f8d16ed7275407ea;hp=3bf974d7072b60ddcec199ac58381a5ebf9250e5;hpb=021b91611ca861389299126f85d597eb965ff33a;p=fs%2Flustre-release.git diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 3bf974d..f00b738 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -47,6 +47,7 @@ #include #include +#include #include #include #include @@ -63,7 +64,7 @@ #include "filter_internal.h" /* Group 0 is no longer a legal group, to catch uninitialized IDs */ -#define FILTER_MIN_GROUPS 3 +#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0 static struct lvfs_callback_ops filter_lvfs_ops; cfs_mem_cache_t *ll_fmd_cachep; @@ -82,7 +83,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, struct filter_client_data *fcd = fed->fed_fcd; __u64 last_rcvd; loff_t off; - int err, log_pri = D_HA; + int err, log_pri = D_RPCTRACE; /* Propagate error code. */ if (rc) @@ -159,14 +160,34 @@ static void init_brw_stats(struct brw_stats *brw_stats) spin_lock_init(&brw_stats->hist[i].oh_lock); } +static int lprocfs_init_rw_stats(struct obd_device *obd, + struct lprocfs_stats **stats) +{ + int num_stats; + + num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) + + LPROC_FILTER_LAST - 1; + *stats = lprocfs_alloc_stats(num_stats, 0); + if (*stats == NULL) + return -ENOMEM; + + lprocfs_init_ops_stats(LPROC_FILTER_LAST, *stats); + lprocfs_counter_init(*stats, LPROC_FILTER_READ_BYTES, + LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); + lprocfs_counter_init(*stats, LPROC_FILTER_WRITE_BYTES, + LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + + return(0); +} + /* brw_stats are 2128, ops are 3916, ldlm are 204, so 6248 bytes per client, plus the procfs overhead :( */ static int filter_export_stats_init(struct obd_device *obd, - struct obd_export *exp) + struct obd_export *exp, + void *client_nid) { struct filter_export_data *fed = &exp->exp_filter_data; - struct proc_dir_entry *brw_entry; - int rc, num_stats; + int rc, newnid = 0; ENTRY; init_brw_stats(&fed->fed_brw_stats); @@ -175,30 +196,35 @@ static int filter_export_stats_init(struct obd_device *obd, /* Self-export gets no proc entry */ RETURN(0); - rc = lprocfs_exp_setup(exp); + rc = lprocfs_exp_setup(exp, client_nid, &newnid); if (rc) RETURN(rc); - /* Create a per export proc entry for brw_stats */ - brw_entry = create_proc_entry("brw_stats", 0644, exp->exp_proc); - if (brw_entry == NULL) - RETURN(-ENOMEM); - brw_entry->proc_fops = &filter_per_export_stats_fops; - brw_entry->data = fed; + if (newnid) { + struct nid_stat *tmp = exp->exp_nid_stats; + LASSERT(tmp != NULL); + + OBD_ALLOC(tmp->nid_brw_stats, sizeof(struct brw_stats)); + if (tmp->nid_brw_stats == NULL) + RETURN(-ENOMEM); + + init_brw_stats(tmp->nid_brw_stats); + rc = lprocfs_seq_create(exp->exp_nid_stats->nid_proc, "brw_stats", + 0644, &filter_per_nid_stats_fops, + exp->exp_nid_stats); + if (rc) + CWARN("Error adding the brw_stats file\n"); + + rc = lprocfs_init_rw_stats(obd, &exp->exp_nid_stats->nid_stats); + if (rc) + RETURN(rc); + + rc = lprocfs_register_stats(tmp->nid_proc, "stats", + tmp->nid_stats); + if (rc) + RETURN(rc); + } - /* Create a per export proc entry for ops stats */ - num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) + - LPROC_FILTER_LAST - 1; - exp->exp_ops_stats = lprocfs_alloc_stats(num_stats, - LPROCFS_STATS_FLAG_NOPERCPU); - if (exp->exp_ops_stats == NULL) - RETURN(-ENOMEM); - lprocfs_init_ops_stats(LPROC_FILTER_LAST, exp->exp_ops_stats); - lprocfs_counter_init(exp->exp_ops_stats, LPROC_FILTER_READ_BYTES, - LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); - lprocfs_counter_init(exp->exp_ops_stats, LPROC_FILTER_WRITE_BYTES, - LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); - lprocfs_register_stats(exp->exp_proc, "stats", exp->exp_ops_stats); RETURN(0); } @@ -548,8 +574,8 @@ static void filter_fmd_cleanup(struct obd_export *exp) static int filter_init_export(struct obd_export *exp) { spin_lock_init(&exp->exp_filter_data.fed_lock); - INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - + CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); @@ -771,7 +797,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) fed = &exp->exp_filter_data; fed->fed_fcd = fcd; fed->fed_group = le32_to_cpu(fcd->fcd_group); - filter_export_stats_init(obd, exp); + filter_export_stats_init(obd, exp, NULL); rc = filter_client_add(obd, exp, cl_idx); /* can't fail for existing client */ LASSERTF(rc == 0, "rc = %d\n", rc); @@ -1388,6 +1414,8 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group, if (IS_ERR(dparent)) return dparent; + if (dparent == NULL) + return ERR_PTR(-ENOENT); rc = filter_lock_dentry(obd, dparent); fsfilt_check_slow(obd, now, obd_timeout, "parent lock"); @@ -1564,20 +1592,68 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid, return(rc); } +struct filter_intent_args { + struct ldlm_lock **victim; + __u64 size; + int *liblustre; +}; + +static enum interval_iter filter_intent_cb(struct interval_node *n, + void *args) +{ + struct ldlm_interval *node = (struct ldlm_interval *)n; + struct filter_intent_args *arg = (struct filter_intent_args*)args; + __u64 size = arg->size; + struct ldlm_lock **v = arg->victim; + struct ldlm_lock *lck; + + /* If the interval is lower than the current file size, + * just break. */ + if (interval_high(n) <= size) + return INTERVAL_ITER_STOP; + + list_for_each_entry(lck, &node->li_group, l_sl_policy) { + /* Don't send glimpse ASTs to liblustre clients. + * They aren't listening for them, and they do + * entirely synchronous I/O anyways. */ + if (lck->l_export == NULL || + lck->l_export->exp_libclient == 1) + continue; + + if (*arg->liblustre) + *arg->liblustre = 0; + + if (*v == NULL) { + *v = LDLM_LOCK_GET(lck); + } else if ((*v)->l_policy_data.l_extent.start < + lck->l_policy_data.l_extent.start) { + LDLM_LOCK_PUT(*v); + *v = LDLM_LOCK_GET(lck); + } + + /* the same policy group - every lock has the + * same extent, so needn't do it any more */ + break; + } + + return INTERVAL_ITER_CONT; +} + static int filter_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data) { - struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + CFS_LIST_HEAD(rpc_list); struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp, *l = NULL; struct ldlm_resource *res = lock->l_resource; ldlm_processing_policy policy; struct ost_lvb *res_lvb, *reply_lvb; struct ldlm_reply *rep; - struct list_head *tmp; ldlm_error_t err; - int rc, tmpflags = 0, only_liblustre = 0; + int idx, rc, tmpflags = 0, only_liblustre = 1; + struct ldlm_interval_tree *tree; + struct filter_intent_args arg; int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*rep), [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) }; @@ -1602,7 +1678,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns, /* If we grant any lock at all, it will be a whole-file read lock. * Call the extent policy function to see if our request can be - * granted, or is blocked. */ + * granted, or is blocked. + * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock + */ lock->l_policy_data.l_extent.start = 0; lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; lock->l_req_mode = LCK_PR; @@ -1650,42 +1728,23 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LASSERT(res_lvb != NULL); *reply_lvb = *res_lvb; - list_for_each(tmp, &res->lr_granted) { - struct ldlm_lock *tmplock = - list_entry(tmp, struct ldlm_lock, l_res_link); - - if (tmplock->l_granted_mode == LCK_PR) - continue; - /* - * ->ns_lock guarantees that no new locks are granted, and, - * therefore, that res->lr_lvb_data cannot increase beyond the - * end of already granted lock. As a result, it is safe to - * check against "stale" reply_lvb->lvb_size value without - * res->lr_lvb_sem. - */ - if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size) - continue; - - /* Don't send glimpse ASTs to liblustre clients. They aren't - * listening for them, and they do entirely synchronous I/O - * anyways. */ - if (tmplock->l_export == NULL || - tmplock->l_export->exp_libclient == 1) { - only_liblustre = 1; - continue; - } - - if (l == NULL) { - l = LDLM_LOCK_GET(tmplock); - continue; - } - - if (l->l_policy_data.l_extent.start > - tmplock->l_policy_data.l_extent.start) + /* + * ->ns_lock guarantees that no new locks are granted, and, + * therefore, that res->lr_lvb_data cannot increase beyond the + * end of already granted lock. As a result, it is safe to + * check against "stale" reply_lvb->lvb_size value without + * res->lr_lvb_sem. + */ + arg.size = reply_lvb->lvb_size; + arg.victim = &l; + arg.liblustre = &only_liblustre; + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + tree = &res->lr_itree[idx]; + if (tree->lit_mode == LCK_PR) continue; - LDLM_LOCK_PUT(l); - l = LDLM_LOCK_GET(tmplock); + interval_iterate_reverse(tree->lit_root, + filter_intent_cb, &arg); } unlock_res(res); @@ -1929,18 +1988,21 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, spin_lock_init(&filter->fo_translock); spin_lock_init(&filter->fo_objidlock); - INIT_LIST_HEAD(&filter->fo_export_list); + CFS_INIT_LIST_HEAD(&filter->fo_export_list); sema_init(&filter->fo_alloc_lock, 1); init_brw_stats(&filter->fo_filter_stats); filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT; filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; - INIT_LIST_HEAD(&filter->fo_llog_list); + CFS_INIT_LIST_HEAD(&filter->fo_llog_list); spin_lock_init(&filter->fo_llog_list_lock); + filter->fo_sptlrpc_lock = RW_LOCK_UNLOCKED; + sptlrpc_rule_set_init(&filter->fo_sptlrpc_rset); + filter->fo_fl_oss_capa = 0; - INIT_LIST_HEAD(&filter->fo_capa_keys); + CFS_INIT_LIST_HEAD(&filter->fo_capa_keys); filter->fo_capa_hash = init_capa_hash(); if (filter->fo_capa_hash == NULL) GOTO(err_ops, rc = -ENOMEM); @@ -1957,7 +2019,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "filter_ldlm_cb_client", &obd->obd_ldlm_client); - rc = llog_cat_initialize(obd, NULL, 1, NULL); + rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL); if (rc) { CERROR("failed to setup llogging subsystems\n"); GOTO(err_post, rc); @@ -2015,7 +2077,8 @@ err_mntput: static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { struct lprocfs_static_vars lvars; - unsigned long page; + unsigned long addr; + struct page *page; int rc; CLASSERT(offsetof(struct obd_device, u.obt) == @@ -2025,13 +2088,15 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) RETURN(-EINVAL); /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */ - page = get_zeroed_page(GFP_KERNEL); + OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); if (!page) RETURN(-ENOMEM); + addr = (unsigned long)cfs_page_address(page); + clear_page((void *)addr); /* lprocfs must be setup before the filter so state can be safely added * to /proc incrementally as the filter is setup */ - lprocfs_init_vars(filter, &lvars); + lprocfs_filter_init_vars(&lvars); if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) { /* Init obdfilter private stats here */ @@ -2043,18 +2108,31 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) "write_bytes", "bytes"); lproc_filter_attach_seqstat(obd); - obd->obd_proc_exports = proc_mkdir("exports", - obd->obd_proc_entry); + obd->obd_proc_exports_entry = lprocfs_register("exports", + obd->obd_proc_entry, + NULL, NULL); + if (IS_ERR(obd->obd_proc_exports_entry)) { + rc = PTR_ERR(obd->obd_proc_exports_entry); + CERROR("error %d setting up lprocfs for %s\n", + rc, "exports"); + obd->obd_proc_exports_entry = NULL; + } } + if (obd->obd_proc_exports_entry) + lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", + lprocfs_nid_stats_clear_read, + lprocfs_nid_stats_clear_write, obd); - memcpy((void *)page, lustre_cfg_buf(lcfg, 4), + memcpy((void *)addr, lustre_cfg_buf(lcfg, 4), LUSTRE_CFG_BUFLEN(lcfg, 4)); - rc = filter_common_setup(obd, lcfg, (void *)page); - free_page(page); + rc = filter_common_setup(obd, lcfg, (void *)addr); + OBD_PAGE_FREE(page); if (rc) { - lprocfs_obd_cleanup(obd); + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); + lprocfs_free_per_client_stats(obd); lprocfs_free_obd_stats(obd); + lprocfs_obd_cleanup(obd); } return rc; @@ -2067,67 +2145,81 @@ static struct llog_operations filter_size_orig_logops = { lop_add: llog_obd_origin_add }; -static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs, +static int filter_llog_init(struct obd_device *obd, int group, struct obd_device *tgt, int count, struct llog_catid *catid, struct obd_uuid *uuid) { + struct filter_obd *filter = &obd->u.filter; + struct obd_llog_group *olg; struct llog_ctxt *ctxt; int rc; ENTRY; + olg = filter_find_olg(obd, group); + if (IS_ERR(olg)) + RETURN(PTR_ERR(olg)); + + if (group == OBD_LLOG_GROUP) { + LASSERT(filter->fo_lcm == NULL); + OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master)); + if (!filter->fo_lcm) + RETURN(-ENOMEM); + + rc = llog_init_commit_master((struct llog_commit_master *) + filter->fo_lcm); + if (rc) + GOTO(cleanup, rc); + filter_mds_ost_repl_logops = llog_client_ops; filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel; filter_mds_ost_repl_logops.lop_connect = llog_repl_connect; filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync; - - rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL, + } else { + LASSERT(filter->fo_lcm != NULL); + } + rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL, &filter_mds_ost_repl_logops); if (rc) - RETURN(rc); + GOTO(cleanup, rc); /* FIXME - assign unlink_cb for filter's recovery */ - if (!llogs) - ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT); - else - ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT); + LASSERT(olg); + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); LASSERT(ctxt != NULL); ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb; + ctxt->loc_lcm = obd->u.filter.fo_lcm; + rc = llog_start_commit_thread(ctxt->loc_lcm); + llog_ctxt_put(ctxt); + if (rc) + GOTO(cleanup, rc); - rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL, + rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL, &filter_size_orig_logops); - RETURN(rc); -} - -static int filter_group_llog_cleanup(struct llog_ctxt *ctxt) -{ - int rc = 0; - ENTRY; - - if (CTXTP(ctxt, cleanup)) - rc = CTXTP(ctxt, cleanup)(ctxt); - - if (ctxt->loc_exp) - class_export_put(ctxt->loc_exp); - OBD_FREE(ctxt, sizeof(*ctxt)); +cleanup: + if (rc) { + llog_cleanup_commit_master(filter->fo_lcm, 0); + OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master)); + filter->fo_lcm = NULL; + } RETURN(rc); } -static int filter_group_llog_finish(struct obd_llogs *llogs) +static int filter_group_llog_finish(struct obd_llog_group *olg) { struct llog_ctxt *ctxt; int rc = 0, rc2 = 0; ENTRY; - ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT); + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); if (ctxt) - rc = filter_group_llog_cleanup(ctxt); + rc = llog_cleanup(ctxt); - ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT); + ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT); if (ctxt) - rc2 = filter_group_llog_cleanup(ctxt); + rc2 = llog_cleanup(ctxt); if (!rc) rc = rc2; @@ -2136,89 +2228,66 @@ static int filter_group_llog_finish(struct obd_llogs *llogs) static int filter_llog_finish(struct obd_device *obd, int count) { - struct llog_ctxt *ctxt; - int rc = 0, rc2 = 0; + int rc; ENTRY; - ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); - - ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT); - if (ctxt) - rc2 = llog_cleanup(ctxt); - if (!rc) - rc = rc2; + if (obd->u.filter.fo_lcm) { + llog_cleanup_commit_master((struct llog_commit_master *) + obd->u.filter.fo_lcm, 0); + OBD_FREE(obd->u.filter.fo_lcm, + sizeof(struct llog_commit_master)); + obd->u.filter.fo_lcm = NULL; + } + /* finish obd llog group */ + rc = filter_group_llog_finish(&obd->obd_olg); RETURN(rc); } -struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group, - struct obd_export *export) +struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group) { - struct filter_group_llog *fglog, *nlog; + struct obd_llog_group *olg, *nolg; struct filter_obd *filter; - struct llog_ctxt *ctxt; - struct list_head *cur; int rc; filter = &obd->u.filter; + if (group == OBD_LLOG_GROUP) + RETURN(&obd->obd_olg); + spin_lock(&filter->fo_llog_list_lock); - list_for_each(cur, &filter->fo_llog_list) { - fglog = list_entry(cur, struct filter_group_llog, list); - if (fglog->group == group) { - if (!(fglog->exp == NULL || fglog->exp == export || export == NULL)) - CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n", - obd->obd_name, group, fglog->exp, export); + list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { + if (olg->olg_group == group) { spin_unlock(&filter->fo_llog_list_lock); - goto init; + RETURN(olg); } } spin_unlock(&filter->fo_llog_list_lock); - if (export == NULL) - RETURN(NULL); - - OBD_ALLOC_PTR(fglog); - if (fglog == NULL) - RETURN(NULL); - fglog->group = group; - - OBD_ALLOC_PTR(fglog->llogs); - if (fglog->llogs == NULL) { - OBD_FREE_PTR(fglog); - RETURN(NULL); - } + OBD_ALLOC_PTR(olg); + if (olg == NULL) + RETURN(ERR_PTR(-ENOMEM)); + llog_group_init(olg, group); spin_lock(&filter->fo_llog_list_lock); - list_for_each(cur, &filter->fo_llog_list) { - nlog = list_entry(cur, struct filter_group_llog, list); - LASSERT(nlog->group != group); + list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) { + LASSERT(nolg->olg_group != group); } - list_add(&fglog->list, &filter->fo_llog_list); + list_add(&olg->olg_list, &filter->fo_llog_list); spin_unlock(&filter->fo_llog_list_lock); - rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL); + rc = llog_cat_initialize(obd, olg, 1, NULL); if (rc) { - OBD_FREE_PTR(fglog->llogs); - OBD_FREE_PTR(fglog); - RETURN(NULL); - } - -init: - if (export) { - fglog->exp = export; - ctxt = llog_get_context_from_llogs(fglog->llogs, - LLOG_MDS_OST_REPL_CTXT); - LASSERT(ctxt != NULL); - - llog_receptor_accept(ctxt, export->exp_imp_reverse); + spin_lock(&filter->fo_llog_list_lock); + list_del(&olg->olg_list); + spin_unlock(&filter->fo_llog_list_lock); + OBD_FREE_PTR(olg); + RETURN(ERR_PTR(rc)); } - CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", - obd->obd_name, fglog->llogs, group); + CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n", + obd->obd_name, group, olg); - RETURN(fglog->llogs); + RETURN(olg); } static int filter_llog_connect(struct obd_export *exp, @@ -2226,7 +2295,7 @@ static int filter_llog_connect(struct obd_export *exp, { struct obd_device *obd = exp->exp_obd; struct llog_ctxt *ctxt; - struct obd_llogs *llog; + struct obd_llog_group *olg; int rc; ENTRY; @@ -2235,13 +2304,17 @@ static int filter_llog_connect(struct obd_export *exp, (unsigned) body->lgdc_logid.lgl_oid, (unsigned) body->lgdc_logid.lgl_ogen); - llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp); - LASSERT(llog != NULL); - ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx); + olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr); + if (IS_ERR(olg)) + RETURN(PTR_ERR(olg)); + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx); LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n", body->lgdc_ctxt_idx); rc = llog_connect(ctxt, 1, &body->lgdc_logid, &body->lgdc_gen, NULL); + llog_ctxt_put(ctxt); if (rc != 0) CERROR("failed to connect rc %d idx %d\n", rc, body->lgdc_ctxt_idx); @@ -2251,33 +2324,32 @@ static int filter_llog_connect(struct obd_export *exp, static int filter_llog_preclean (struct obd_device *obd) { - struct filter_group_llog *log; + struct obd_llog_group *olg; struct filter_obd *filter; int rc = 0; ENTRY; + rc = obd_llog_finish(obd, 0); + if (rc) + CERROR("failed to cleanup llogging subsystem\n"); + filter = &obd->u.filter; spin_lock(&filter->fo_llog_list_lock); while (!list_empty(&filter->fo_llog_list)) { - log = list_entry(filter->fo_llog_list.next, - struct filter_group_llog, list); - list_del(&log->list); + olg = list_entry(filter->fo_llog_list.next, + struct obd_llog_group, olg_list); + list_del(&olg->olg_list); spin_unlock(&filter->fo_llog_list_lock); - rc = filter_group_llog_finish(log->llogs); + rc = filter_group_llog_finish(olg); if (rc) CERROR("failed to cleanup llogging subsystem for %u\n", - log->group); - OBD_FREE_PTR(log->llogs); - OBD_FREE_PTR(log); + olg->olg_group); + OBD_FREE_PTR(olg); spin_lock(&filter->fo_llog_list_lock); } spin_unlock(&filter->fo_llog_list_lock); - rc = obd_llog_finish(obd, 0); - if (rc) - CERROR("failed to cleanup llogging subsystem\n"); - RETURN(rc); } @@ -2292,9 +2364,9 @@ static int filter_precleanup(struct obd_device *obd, break; case OBD_CLEANUP_EXPORTS: target_cleanup_recovery(obd); + rc = filter_llog_preclean(obd); break; case OBD_CLEANUP_SELF_EXP: - rc = filter_llog_preclean(obd); break; case OBD_CLEANUP_OBD: break; @@ -2320,8 +2392,10 @@ static int filter_cleanup(struct obd_device *obd) } } - lprocfs_obd_cleanup(obd); + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); + lprocfs_free_per_client_stats(obd); lprocfs_free_obd_stats(obd); + lprocfs_obd_cleanup(obd); lquota_cleanup(filter_quota_interface_ref, obd); /* Stop recovery before namespace cleanup. */ @@ -2330,14 +2404,15 @@ static int filter_cleanup(struct obd_device *obd) ldlm_namespace_free(obd->obd_namespace, obd->obd_force); + sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset); + if (obd->u.obt.obt_sb == NULL) RETURN(0); filter_post(obd); - shrink_dcache_parent(obd->u.obt.obt_sb->s_root); - LL_DQUOT_OFF(obd->u.obt.obt_sb); + shrink_dcache_sb(obd->u.obt.obt_sb); server_put_mount(obd->obd_name, filter->fo_vfsmnt); obd->u.obt.obt_sb = NULL; @@ -2367,6 +2442,14 @@ static int filter_connect_internal(struct obd_export *exp, exp->exp_connect_flags = data->ocd_connect_flags; data->ocd_version = LUSTRE_VERSION_CODE; + if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) { + CWARN("%s: OST requires FID support (flag="LPX64 + "), but client not\n", + exp->exp_obd->obd_name, + exp->exp_connect_flags); + RETURN(-EBADF); + } + if (exp->exp_connect_flags & OBD_CONNECT_GRANT) { struct filter_export_data *fed = &exp->exp_filter_data; obd_size left, want; @@ -2415,6 +2498,30 @@ static int filter_connect_internal(struct obd_export *exp, LASSERT(data->ocd_brw_size); } + if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) { + __u32 cksum_types = data->ocd_cksum_types; + + /* The client set in ocd_cksum_types the checksum types it + * supports. We have to mask off the algorithms that we don't + * support */ + if (cksum_types & OBD_CKSUM_ALL) + data->ocd_cksum_types &= OBD_CKSUM_ALL; + else + data->ocd_cksum_types = OBD_CKSUM_CRC32; + + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " + "%x\n", exp->exp_obd->obd_name, + obd_export_nid2str(exp), cksum_types, + data->ocd_cksum_types); + } else { + /* This client does not support OBD_CONNECT_CKSUM + * fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support " + "OBD_CONNECT_CKSUM, CRC32 will be used\n", + exp->exp_obd->obd_name, + obd_export_nid2str(exp)); + } + /* FIXME: Do the same with the MDS UUID and fsd_peeruuid. * FIXME: We don't strictly need the COMPAT flag for that, * FIXME: as fsd_peeruuid[0] will tell us if that is set. @@ -2423,7 +2530,8 @@ static int filter_connect_internal(struct obd_export *exp, RETURN(0); } -static int filter_reconnect(struct obd_export *exp, struct obd_device *obd, +static int filter_reconnect(const struct lu_env *env, + struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data) { @@ -2442,7 +2550,7 @@ static int filter_reconnect(struct obd_export *exp, struct obd_device *obd, static int filter_connect(const struct lu_env *env, struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, - struct obd_connect_data *data) + struct obd_connect_data *data, void *localdata) { struct lvfs_run_ctxt saved; struct obd_export *exp; @@ -2467,7 +2575,7 @@ static int filter_connect(const struct lu_env *env, if (rc) GOTO(cleanup, rc); - filter_export_stats_init(obd, exp); + filter_export_stats_init(obd, exp, localdata); group = data->ocd_group; if (obd->obd_replayable) { OBD_ALLOC(fcd, sizeof(*fcd)); @@ -2642,6 +2750,10 @@ static int filter_destroy_export(struct obd_export *exp) exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, exp->exp_filter_data.fed_pending); + /* Not ported yet the b1_6 quota functionality + * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); + */ + target_destroy_export(exp); if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) @@ -2665,7 +2777,7 @@ static int filter_destroy_export(struct obd_export *exp) static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) { - struct filter_group_llog *fglog, *nlog; + struct obd_llog_group *olg_min, *olg; struct filter_obd *filter; int worked = 0, group; struct llog_ctxt *ctxt; @@ -2678,35 +2790,41 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) * group order and skip already synced llogs -bzzz */ do { /* look for group with min. number, but > worked */ - fglog = NULL; + olg_min = NULL; group = 1 << 30; spin_lock(&filter->fo_llog_list_lock); - list_for_each_entry(nlog, &filter->fo_llog_list, list) { - if (nlog->group <= worked) { + list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { + if (olg->olg_group <= worked) { /* this group is already synced */ continue; } - if (group < nlog->group) { + if (group < olg->olg_group) { /* we have group with smaller number to sync */ continue; } /* store current minimal group */ - fglog = nlog; - group = nlog->group; + olg_min = olg; + group = olg->olg_group; } spin_unlock(&filter->fo_llog_list_lock); - if (fglog == NULL) + if (olg_min == NULL) break; - worked = fglog->group; - if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) { - ctxt = llog_get_context_from_llogs(fglog->llogs, + worked = olg_min->olg_group; + if (olg_min->olg_exp && + (dexp == olg_min->olg_exp || dexp == NULL)) { + int err; + ctxt = llog_group_get_ctxt(olg_min, LLOG_MDS_OST_REPL_CTXT); LASSERT(ctxt != NULL); - llog_sync(ctxt, fglog->exp); + err = llog_sync(ctxt, olg_min->olg_exp); + llog_ctxt_put(ctxt); + if (err) + CERROR("error flushing logs to MDS: rc %d\n", + err); } - } while (fglog != NULL); + } while (olg_min != NULL); } /* also incredibly similar to mds_disconnect */ @@ -2769,8 +2887,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa, dchild = filter_fid2dentry(obd, NULL, group, oa->o_id); if (IS_ERR(dchild)) { - CERROR("%s error looking up object: "LPU64"\n", - what, oa->o_id); + CERROR("%s error looking up object: "LPU64":"LPU64"\n", + what, group, oa->o_id); RETURN(dchild); } @@ -3016,6 +3134,26 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, if (rc) RETURN(rc); + /* This would be very bad - accidentally truncating a file when + * changing the time or similar - bug 12203. */ + if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE && + oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) { + static char mdsinum[48]; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) + snprintf(mdsinum, sizeof(mdsinum) - 1, + " of inode "LPU64"/%u", oinfo->oi_oa->o_fid, + oinfo->oi_oa->o_generation); + else + mdsinum[0] = '\0'; + + CERROR("%s: setattr from %s trying to truncate objid "LPU64 + " %s\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), + oinfo->oi_oa->o_id, mdsinum); + RETURN(-EPERM); + } + dentry = __filter_oa2dentry(exp->exp_obd, oinfo->oi_oa, __FUNCTION__, 1); if (IS_ERR(dentry)) @@ -3252,7 +3390,7 @@ out: } static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age) + __u64 max_age, __u32 flags) { struct filter_obd *filter = &obd->u.filter; int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; @@ -3348,7 +3486,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, OBD_ALLOC(osfs, sizeof(*osfs)); if (osfs == NULL) RETURN(-ENOMEM); - rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ); + rc = filter_statfs(obd, osfs, cfs_time_current_64() - HZ, 0); if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { CDEBUG(D_RPCTRACE,"%s: not enough space for create " LPU64"\n", obd->obd_name, osfs->os_bavail << @@ -3596,9 +3734,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, oa->o_id); /* If object already gone, cancel cookie right now */ if (oa->o_valid & OBD_MD_FLCOOKIE) { + struct llog_ctxt *ctxt; + struct obd_llog_group *olg; fcc = obdo_logcookie(oa); - llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1), - NULL, 1, fcc, 0); + olg = filter_find_olg(obd, oa->o_gr); + if (IS_ERR(olg)) + GOTO(cleanup, rc = PTR_ERR(olg)); + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1); + llog_cancel(ctxt, NULL, 1, fcc, 0); + llog_ctxt_put(ctxt); fcc = NULL; /* we didn't allocate fcc, don't free it */ } GOTO(cleanup, rc = -ENOENT); @@ -3751,7 +3897,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, struct lvfs_run_ctxt saved; struct filter_obd *filter; struct dentry *dentry; - struct llog_ctxt *ctxt; int rc, rc2; ENTRY; @@ -3766,8 +3911,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, if (!oa || !(oa->o_valid & OBD_MD_FLID)) { rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb); /* flush any remaining cancel messages out to the target */ - ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT); - llog_sync(ctxt, exp); + filter_sync_llogs(exp->exp_obd, exp); RETURN(rc); } @@ -3814,28 +3958,41 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen, RETURN(-EINVAL); } - if (keylen == strlen("blocksize") && - memcmp(key, "blocksize", keylen) == 0) { + if (KEY_IS("blocksize")) { __u32 *blocksize = val; + if (blocksize) { + if (*vallen < sizeof(*blocksize)) + RETURN(-EOVERFLOW); + *blocksize = obd->u.obt.obt_sb->s_blocksize; + } *vallen = sizeof(*blocksize); - *blocksize = obd->u.obt.obt_sb->s_blocksize; RETURN(0); } - if (keylen == strlen("blocksize_bits") && - memcmp(key, "blocksize_bits", keylen) == 0) { + if (KEY_IS("blocksize_bits")) { __u32 *blocksize_bits = val; + if (blocksize_bits) { + if (*vallen < sizeof(*blocksize_bits)) + RETURN(-EOVERFLOW); + *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits; + } *vallen = sizeof(*blocksize_bits); - *blocksize_bits = obd->u.obt.obt_sb->s_blocksize_bits; RETURN(0); } - if (keylen >= strlen("last_id") && memcmp(key, "last_id", 7) == 0) { + if (KEY_IS("last_id")) { obd_id *last_id = val; /* FIXME: object groups */ - *last_id = filter_last_id(&obd->u.filter, 0); + if (last_id) { + if (*vallen < sizeof(*last_id)) + RETURN(-EOVERFLOW); + *last_id = filter_last_id(&obd->u.filter, + exp->exp_filter_data.fed_group); + } + *vallen = sizeof(*last_id); RETURN(0); } + CDEBUG(D_IOCTL, "invalid key\n"); RETURN(-EINVAL); } @@ -3845,7 +4002,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen, struct ptlrpc_request_set *set) { struct obd_device *obd; - struct obd_llogs *llog; + struct obd_llog_group *olg; struct llog_ctxt *ctxt; int rc = 0, group; ENTRY; @@ -3868,8 +4025,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen, RETURN(0); } - if (keylen < strlen(KEY_MDS_CONN) || - memcmp(key, KEY_MDS_CONN, keylen) != 0) + if (!KEY_IS(KEY_MDS_CONN)) RETURN(-EINVAL); LCONSOLE_WARN("%s: received MDS connection from %s\n", obd->obd_name, @@ -3881,12 +4037,16 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen, group = (int)(*(__u32 *)val); LASSERT(group >= FILTER_GROUP_MDS0); - llog = filter_grab_llog_for_group(obd, group, exp); - LASSERT(llog != NULL); - ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT); - LASSERTF(ctxt != NULL, "ctxt is not null\n"), + olg = filter_find_olg(obd, group); + if (IS_ERR(olg)) + RETURN(PTR_ERR(olg)); + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); + LASSERTF(ctxt != NULL, "ctxt is null\n"), rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); + llog_ctxt_put(ctxt); lquota_setinfo(filter_quota_interface_ref, exp, obd); @@ -3995,9 +4155,43 @@ static int filter_process_config(struct obd_device *obd, obd_count len, struct lprocfs_static_vars lvars; int rc = 0; - lprocfs_init_vars(filter, &lvars); + switch (lcfg->lcfg_command) { + case LCFG_SPTLRPC_CONF: { + struct filter_obd *filter = &obd->u.filter; + struct sptlrpc_conf_log *log; + struct sptlrpc_rule_set tmp_rset; + + log = sptlrpc_conf_log_extract(lcfg); + if (IS_ERR(log)) { + rc = PTR_ERR(log); + break; + } + + sptlrpc_rule_set_init(&tmp_rset); + + rc = sptlrpc_rule_set_from_log(&tmp_rset, log); + if (rc) { + CERROR("obd %s: failed get sptlrpc rules: %d\n", + obd->obd_name, rc); + break; + } + + write_lock(&filter->fo_sptlrpc_lock); + sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset); + filter->fo_sptlrpc_rset = tmp_rset; + write_unlock(&filter->fo_sptlrpc_lock); + + sptlrpc_target_update_exp_flavor(obd, &tmp_rset); + break; + } + default: + lprocfs_filter_init_vars(&lvars); + + rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, + lcfg, obd); + break; + } - rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, lcfg, obd); return rc; } @@ -4045,7 +4239,7 @@ static int __init obdfilter_init(void) struct lprocfs_static_vars lvars; int rc; - lprocfs_init_vars(filter, &lvars); + lprocfs_filter_init_vars(&lvars); request_module("lquota"); OBD_ALLOC(obdfilter_created_scratchpad,