X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter.c;h=6f2b7b9be17949bc1a54d11eb4f1dc61d00592ad;hp=e1442544dfe9bbf7b7f5b9a3a1b254faab109bba;hb=8da33c6cc5192303fcd18f45892e1f115004e662;hpb=ba85c40a877d7f12593533d3066609d41c6bdf36 diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index e144254..6f2b7b9 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -26,10 +26,13 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* + * Copyright (c) 2011 Whamcloud, Inc. + */ +/* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. * @@ -152,14 +155,25 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode, if (oti->oti_transno == 0) { last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1; lsd->lsd_last_transno = cpu_to_le64(last_rcvd); + LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno)); } else { last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno)) lsd->lsd_last_transno = cpu_to_le64(last_rcvd); + if (unlikely(last_rcvd < le64_to_cpu(lcd->lcd_last_transno))) { + CERROR("Trying to overwrite bigger transno, on-disk: " + LPU64", new: "LPU64"\n", + le64_to_cpu(lcd->lcd_last_transno), last_rcvd); + cfs_spin_lock(&exp->exp_lock); + exp->exp_vbr_failed = 1; + cfs_spin_unlock(&exp->exp_lock); + cfs_spin_unlock(&obt->obt_lut->lut_translock); + cfs_mutex_up(&ted->ted_lcd_lock); + RETURN(-EOVERFLOW); + } } oti->oti_transno = last_rcvd; - LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno)); lcd->lcd_last_transno = cpu_to_le64(last_rcvd); lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version); lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid); @@ -289,7 +303,6 @@ static int filter_export_stats_init(struct obd_device *obd, RETURN(0); clean: - lprocfs_exp_cleanup(exp); return rc; } @@ -880,9 +893,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) exp->exp_connecting = 0; exp->exp_in_recovery = 0; cfs_spin_unlock(&exp->exp_lock); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_max_recoverable_clients++; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); class_export_put(exp); if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno)) @@ -892,8 +903,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) obd->obd_last_committed = le64_to_cpu(lsd->lsd_last_transno); out: - lut->lut_mount_count = mount_count + 1; - lsd->lsd_mount_count = cpu_to_le64(lut->lut_mount_count); + obd->u.obt.obt_mount_count = mount_count + 1; + obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count; + lsd->lsd_mount_count = cpu_to_le64(obd->u.obt.obt_mount_count); /* save it, so mount count and last_transno is current */ rc = filter_update_server_data(obd); @@ -1197,12 +1209,12 @@ static int filter_prep_groups(struct obd_device *obd) loff_t off = 0; ENTRY; - O_dentry = simple_mkdir(current->fs->pwd, obd->u.obt.obt_vfsmnt, + O_dentry = simple_mkdir(cfs_fs_pwd(current->fs), obd->u.obt.obt_vfsmnt, "O", 0700, 1); - CDEBUG(D_INODE, "got/created O: %p\n", O_dentry); + CDEBUG(D_INODE, "%s: got/created O: %p\n", obd->obd_name, O_dentry); if (IS_ERR(O_dentry)) { rc = PTR_ERR(O_dentry); - CERROR("cannot open/create O: rc = %d\n", rc); + CERROR("%s: cannot open/create O: rc = %d\n", obd->obd_name,rc); GOTO(cleanup, rc); } filter->fo_dentry_O = O_dentry; @@ -1212,22 +1224,24 @@ static int filter_prep_groups(struct obd_device *obd) * clients because they may send create/destroy for any group -bzzz */ filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700); if (IS_ERR(filp)) { - CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp)); + CERROR("%s: cannot create LAST_GROUP: rc = %ld\n", + obd->obd_name, PTR_ERR(filp)); GOTO(cleanup, rc = PTR_ERR(filp)); } cleanup_phase = 2; /* filp */ rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off); if (rc) { - CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc); + CERROR("%s: error reading LAST_GROUP: rc %d\n", + obd->obd_name, rc); GOTO(cleanup, rc); } if (off == 0) last_group = FID_SEQ_OST_MDT0; - CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, - FID_SEQ_OST_MDT0, last_group); + CDEBUG(D_INODE, "%s: initialize group %u (max %u)\n", obd->obd_name, + FID_SEQ_OST_MDT0, last_group); filter->fo_committed_group = last_group; rc = filter_read_groups(obd, last_group, 1); if (rc) @@ -1408,7 +1422,9 @@ struct dentry *filter_parent(struct obd_device *obd, obd_seq group, obd_id objid { struct filter_obd *filter = &obd->u.filter; struct filter_subdirs *subdirs; - LASSERT(group < filter->fo_group_count); /* FIXME: object groups */ + + if (group >= filter->fo_group_count) /* FIXME: object groups */ + return ERR_PTR(-EBADF); if (!fid_seq_is_mdt(group) || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; @@ -1462,7 +1478,7 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, obd->u.filter.fo_destroys_in_progress == 0) { /* don't fail lookups for orphan recovery, it causes * later LBUGs when objects still exist during precreate */ - CDEBUG(D_INFO, "*** obd_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT); + CDEBUG(D_INFO, "*** cfs_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT); RETURN(ERR_PTR(-ENOENT)); } if (id == 0) { @@ -1482,12 +1498,13 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, } CDEBUG(D_INODE, "looking up object O/%.*s/%s\n", dparent->d_name.len, dparent->d_name.name, name); - dchild = /*ll_*/lookup_one_len(name, dparent, len); + /* dparent is already locked here, so we cannot use ll_lookup_one_len() */ + dchild = lookup_one_len(name, dparent, len); if (dir_dentry == NULL) filter_parent_unlock(dparent); if (IS_ERR(dchild)) { - CERROR("%s: child lookup error %ld\n", obd->obd_name, - PTR_ERR(dchild)); + CERROR("%s: object "LPU64":"LPU64" lookup error: rc %ld\n", + obd->obd_name, id, group, PTR_ERR(dchild)); RETURN(dchild); } @@ -1563,7 +1580,7 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry, GOTO(out, rc = -EPERM); /* check_sticky() */ - if ((dentry->d_inode->i_uid != current->fsuid && + if ((dentry->d_inode->i_uid != cfs_curproc_fsuid() && !cfs_capable(CFS_CAP_FOWNER)) || IS_APPEND(dentry->d_inode) || IS_IMMUTABLE(dentry->d_inode)) GOTO(out, rc = -EPERM); @@ -1571,7 +1588,7 @@ int filter_vfs_unlink(struct inode *dir, struct dentry *dentry, /* NOTE: This might need to go outside i_mutex, though it isn't clear if * that was done because of journal_start (which is already done * here) or some other ordering issue. */ - DQUOT_INIT(dir); + ll_vfs_dq_init(dir); rc = ll_security_inode_unlink(dir, dentry, mnt); if (rc) @@ -1597,7 +1614,10 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid, struct inode *inode = dchild->d_inode; int rc; - if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) { + /* There should be 2 references to the inode: + * 1) taken by filter_prepare_destroy + * 2) taken by filter_destroy */ + if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 2) { CERROR("destroying objid %.*s ino %lu nlink %lu count %d\n", dchild->d_name.len, dchild->d_name.name, inode->i_ino, (unsigned long)inode->i_nlink, @@ -1701,7 +1721,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * lock, and should not be granted if the lock will be blocked. */ - LASSERT(ns == res->lr_namespace); + LASSERT(ns == ldlm_res_to_ns(res)); lock_res(res); rc = policy(lock, &tmpflags, 0, &err, &rpc_list); check_res_locked(res); @@ -1723,7 +1743,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, if (rc == LDLM_ITER_CONTINUE) { /* do not grant locks to the liblustre clients: they cannot * handle ASTs robustly. We need to do this while still - * holding ns_lock to avoid the lock remaining on the res_link + * holding lr_lock to avoid the lock remaining on the res_link * list (and potentially being added to l_pending_list by an * AST) when we are going to drop this lock ASAP. */ if (lock->l_export->exp_libclient || @@ -1746,7 +1766,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, *reply_lvb = *res_lvb; /* - * ->ns_lock guarantees that no new locks are granted, and, + * lr_lock guarantees that no new locks are granted, and, * therefore, that res->lr_lvb_data cannot increase beyond the * end of already granted lock. As a result, it is safe to * check against "stale" reply_lvb->lvb_size value without @@ -1799,13 +1819,6 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ - /* Update the LVB from disk if the AST failed (this is a legal race) */ - /* - * XXX nikita: situation when ldlm_server_glimpse_ast() failed before - * sending ast is not handled. This can result in lost client writes. - */ - if (rc != 0) - ldlm_res_lvbo_update(res, NULL, 1); lock_res(res); *reply_lvb = *res_lvb; @@ -1954,7 +1967,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, __u8 *uuid_ptr; char *str, *label; char ns_name[48]; - request_queue_t *q; + struct request_queue *q; int rc, i; ENTRY; @@ -1970,14 +1983,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb); mnt = lmi->lmi_mnt; obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); - - /* gets recovery timeouts from mount data */ - if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft) - obd->obd_recovery_timeout = - lsi->lsi_lmd->lmd_recovery_time_soft; - if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard) - obd->obd_recovery_time_hard = - lsi->lsi_lmd->lmd_recovery_time_hard; } else { /* old path - used by lctl */ CERROR("Using old MDS mount method\n"); @@ -2009,6 +2014,9 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, /* failover is the default */ obd->obd_replayable = 1; + /* disable connection until configuration finishes */ + obd->obd_no_conn = 1; + if (lcfg->lcfg_bufcount > 3 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { str = lustre_cfg_string(lcfg, 3); if (strchr(str, 'n')) { @@ -2017,6 +2025,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, } } + obd->u.obt.obt_magic = OBT_MAGIC; obd->u.obt.obt_vfsmnt = mnt; obd->u.obt.obt_sb = mnt->mnt_sb; filter->fo_fstype = mnt->mnt_sb->s_type->name; @@ -2040,11 +2049,14 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, CFS_INIT_LIST_HEAD(&filter->fo_export_list); cfs_sema_init(&filter->fo_alloc_lock, 1); init_brw_stats(&filter->fo_filter_stats); + cfs_spin_lock_init(&filter->fo_flags_lock); filter->fo_read_cache = 1; /* enable read-only cache by default */ filter->fo_writethrough_cache = 1; /* enable writethrough cache */ filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT; filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; + filter->fo_syncjournal = 0; /* Don't sync journals on i/o by default */ + filter_slc_set(filter); /* initialize sync on lock cancel */ rc = filter_prep(obd); if (rc) @@ -2061,8 +2073,10 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, GOTO(err_post, rc = -ENOMEM); sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER, - LDLM_NAMESPACE_GREEDY); + obd->obd_namespace = ldlm_namespace_new(obd, ns_name, + LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY, + LDLM_NS_TYPE_OST); if (obd->obd_namespace == NULL) GOTO(err_post, rc = -ENOMEM); obd->obd_namespace->ns_lvbp = obd; @@ -2088,13 +2102,13 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, GOTO(err_post, rc); q = bdev_get_queue(mnt->mnt_sb->s_bdev); - if (q->max_sectors < q->max_hw_sectors && - q->max_sectors < PTLRPC_MAX_BRW_SIZE >> 9) + if (queue_max_sectors(q) < queue_max_hw_sectors(q) && + queue_max_sectors(q) < PTLRPC_MAX_BRW_SIZE >> 9) LCONSOLE_INFO("%s: underlying device %s should be tuned " "for larger I/O requests: max_sectors = %u " "could be up to max_hw_sectors=%u\n", obd->obd_name, mnt->mnt_sb->s_id, - q->max_sectors, q->max_hw_sectors); + queue_max_sectors(q), queue_max_hw_sectors(q)); uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb); if (uuid_ptr != NULL) { @@ -2110,17 +2124,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, lmi ? s2lsi(lmi->lmi_sb)->lsi_lmd->lmd_dev : "", obd->obd_replayable ? "enabled" : "disabled"); - if (obd->obd_recovering) - LCONSOLE_WARN("%s: Will be in recovery for at least %d:%.02d, " - "or until %d client%s reconnect%s\n", - obd->obd_name, - obd->obd_recovery_timeout / 60, - obd->obd_recovery_timeout % 60, - obd->obd_max_recoverable_clients, - (obd->obd_max_recoverable_clients == 1) ? "" : "s", - (obd->obd_max_recoverable_clients == 1) ? "s": ""); - - RETURN(0); err_post: @@ -2137,9 +2140,11 @@ err_mntput: static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { struct lprocfs_static_vars lvars; + cfs_proc_dir_entry_t *entry; unsigned long addr; struct page *page; int rc; + ENTRY; CLASSERT(offsetof(struct obd_device, u.obt) == offsetof(struct obd_device, u.filter.fo_obt)); @@ -2147,69 +2152,89 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) if (!LUSTRE_CFG_BUFLEN(lcfg, 1) || !LUSTRE_CFG_BUFLEN(lcfg, 2)) RETURN(-EINVAL); - /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */ - OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); - if (!page) - RETURN(-ENOMEM); - addr = (unsigned long)cfs_page_address(page); - clear_page((void *)addr); - /* lprocfs must be setup before the filter so state can be safely added * to /proc incrementally as the filter is setup */ lprocfs_filter_init_vars(&lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && - lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) { - /* Init obdfilter private stats here */ - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, - LPROCFS_CNTR_AVGMINMAX, - "read_bytes", "bytes"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, - LPROCFS_CNTR_AVGMINMAX, - "write_bytes", "bytes"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE, - LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV, - "get_page", "usec"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE, - LPROCFS_CNTR_AVGMINMAX, - "get_page_failures", "num"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, - LPROCFS_CNTR_AVGMINMAX, - "cache_access", "pages"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT, - LPROCFS_CNTR_AVGMINMAX, - "cache_hit", "pages"); - lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS, - LPROCFS_CNTR_AVGMINMAX, - "cache_miss", "pages"); - - lproc_filter_attach_seqstat(obd); - obd->obd_proc_exports_entry = lprocfs_register("exports", - obd->obd_proc_entry, - NULL, NULL); - if (IS_ERR(obd->obd_proc_exports_entry)) { - rc = PTR_ERR(obd->obd_proc_exports_entry); - CERROR("error %d setting up lprocfs for %s\n", - rc, "exports"); - obd->obd_proc_exports_entry = NULL; - } + rc = lprocfs_obd_setup(obd, lvars.obd_vars); + if (rc) { + CERROR("%s: lprocfs_obd_setup failed: %d.\n", + obd->obd_name, rc); + RETURN(rc); + } + + rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST); + if (rc) { + CERROR("%s: lprocfs_alloc_obd_stats failed: %d.\n", + obd->obd_name, rc); + GOTO(obd_cleanup, rc); + } + + /* Init obdfilter private stats here */ + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, + LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE, + LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV, + "get_page", "usec"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE, + LPROCFS_CNTR_AVGMINMAX, "get_page_failures", "num"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, + LPROCFS_CNTR_AVGMINMAX, "cache_access", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT, + LPROCFS_CNTR_AVGMINMAX, "cache_hit", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS, + LPROCFS_CNTR_AVGMINMAX, "cache_miss", "pages"); + + rc = lproc_filter_attach_seqstat(obd); + if (rc) { + CERROR("%s: create seqstat failed: %d.\n", obd->obd_name, rc); + GOTO(free_obd_stats, rc); + } + + entry = lprocfs_register("exports", obd->obd_proc_entry, NULL, NULL); + if (IS_ERR(entry)) { + rc = PTR_ERR(entry); + CERROR("%s: error %d setting up lprocfs for %s\n", + obd->obd_name, rc, "exports"); + GOTO(free_obd_stats, rc); } - if (obd->obd_proc_exports_entry) - lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", + obd->obd_proc_exports_entry = entry; + + entry = lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", lprocfs_nid_stats_clear_read, lprocfs_nid_stats_clear_write, obd, NULL); + if (IS_ERR(entry)) { + rc = PTR_ERR(entry); + CERROR("%s: add proc entry 'clear' failed: %d.\n", + obd->obd_name, rc); + GOTO(free_obd_stats, rc); + } + /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */ + OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); + if (!page) + GOTO(remove_entry_clear, rc = -ENOMEM); + addr = (unsigned long)cfs_page_address(page); + clear_page((void *)addr); memcpy((void *)addr, lustre_cfg_buf(lcfg, 4), LUSTRE_CFG_BUFLEN(lcfg, 4)); rc = filter_common_setup(obd, lcfg, (void *)addr); OBD_PAGE_FREE(page); - if (rc) { - lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); - lprocfs_free_per_client_stats(obd); - lprocfs_free_obd_stats(obd); - lprocfs_obd_cleanup(obd); + CERROR("%s: filter_common_setup failed: %d.\n", + obd->obd_name, rc); + GOTO(remove_entry_clear, rc); } + RETURN(0); + +remove_entry_clear: + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); +free_obd_stats: + lprocfs_free_obd_stats(obd); +obd_cleanup: + lprocfs_obd_cleanup(obd); return rc; } @@ -2430,7 +2455,7 @@ struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group) */ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) { - struct obd_llog_group *olg = NULL; + struct obd_llog_group *olg = NULL, *olg_new = NULL; struct filter_obd *filter; int rc; @@ -2439,6 +2464,10 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) if (group == FID_SEQ_LLOG) RETURN(&obd->obd_olg); + OBD_ALLOC_PTR(olg_new); + if (olg_new == NULL) + RETURN(ERR_PTR(-ENOMEM)); + cfs_spin_lock(&filter->fo_llog_list_lock); olg = filter_find_olg_internal(filter, group); if (olg) { @@ -2447,10 +2476,11 @@ struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) } else { GOTO(out_unlock, olg); } + } else { + /* set as the newly allocated one */ + olg = olg_new; + olg_new = NULL; } - OBD_ALLOC_PTR(olg); - if (olg == NULL) - GOTO(out_unlock, olg = ERR_PTR(-ENOMEM)); llog_group_init(olg, group); cfs_list_add(&olg->olg_list, &filter->fo_llog_list); @@ -2475,7 +2505,9 @@ out: out_unlock: cfs_spin_unlock(&filter->fo_llog_list_lock); - GOTO(out, olg); + if (olg_new) + OBD_FREE_PTR(olg_new); + goto out; } static int filter_llog_connect(struct obd_export *exp, @@ -2507,9 +2539,9 @@ static int filter_llog_connect(struct obd_export *exp, obd->obd_name, body->lgdc_logid.lgl_oid, body->lgdc_logid.lgl_oseq, body->lgdc_logid.lgl_ogen); - cfs_spin_lock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->u.filter.fo_flags_lock); obd->u.filter.fo_mds_ost_sync = 1; - cfs_spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_unlock(&obd->u.filter.fo_flags_lock); rc = llog_connect(ctxt, &body->lgdc_logid, &body->lgdc_gen, NULL); llog_ctxt_put(ctxt); @@ -2602,7 +2634,7 @@ static int filter_cleanup(struct obd_device *obd) filter_post(obd); - LL_DQUOT_OFF(obd->u.obt.obt_sb); + ll_vfs_dq_off(obd->u.obt.obt_sb, 0); shrink_dcache_sb(obd->u.obt.obt_sb); server_put_mount(obd->obd_name, obd->u.obt.obt_vfsmnt); @@ -2636,7 +2668,7 @@ static int filter_connect_internal(struct obd_export *exp, CWARN("!!! This export (nid %s) used object group %d " "earlier; now it's trying to use group %d! This could " "be a bug in the MDS. Please report to " - "http://bugzilla.lustre.org/\n", + "http://bugs.whamcloud.com/\n", obd_export_nid2str(exp), fed->fed_group,data->ocd_group); RETURN(-EPROTO); } @@ -2647,8 +2679,10 @@ static int filter_connect_internal(struct obd_export *exp, data->ocd_version = LUSTRE_VERSION_CODE; /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */ - if (!ergo(data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN, - data->ocd_connect_flags & OBD_CONNECT_MDS)) + if (data->ocd_connect_flags & OBD_CONNECT_MDS) + CWARN("%s: Received MDS connection for group %u\n", + exp->exp_obd->obd_name, data->ocd_group); + else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) RETURN(-EPROTO); if (exp->exp_connect_flags & OBD_CONNECT_GRANT) { @@ -2702,7 +2736,18 @@ static int filter_connect_internal(struct obd_export *exp, } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); - LASSERT(data->ocd_brw_size); + if (data->ocd_brw_size == 0) { + CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 + " ocd_version: %x ocd_grant: %d ocd_index: %u " + "ocd_brw_size is unexpectedly zero, " + "network data corruption?" + "Refusing connection of this client\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, + exp, data->ocd_connect_flags, data->ocd_version, + data->ocd_grant, data->ocd_index); + RETURN(-EPROTO); + } } if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) { @@ -2711,9 +2756,10 @@ static int filter_connect_internal(struct obd_export *exp, /* The client set in ocd_cksum_types the checksum types it * supports. We have to mask off the algorithms that we don't * support */ - if (cksum_types & OBD_CKSUM_ALL) - data->ocd_cksum_types &= OBD_CKSUM_ALL; - else + data->ocd_cksum_types &= cksum_types_supported(); + + /* 1.6.4- only support CRC32 and didn't set ocd_cksum_types */ + if (unlikely(data->ocd_cksum_types == 0)) data->ocd_cksum_types = OBD_CKSUM_CRC32; CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " @@ -2729,6 +2775,9 @@ static int filter_connect_internal(struct obd_export *exp, obd_export_nid2str(exp)); } + if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES) + data->ocd_maxbytes = exp->exp_obd->u.obt.obt_sb->s_maxbytes; + RETURN(0); } @@ -2751,7 +2800,6 @@ static int filter_reconnect(const struct lu_env *env, RETURN(rc); } -/* nearly identical to mds_connect */ static int filter_connect(const struct lu_env *env, struct obd_export **exp, struct obd_device *obd, struct obd_uuid *cluuid, @@ -2760,7 +2808,6 @@ static int filter_connect(const struct lu_env *env, struct lvfs_run_ctxt saved; struct lustre_handle conn = { 0 }; struct obd_export *lexp; - __u32 group; int rc; ENTRY; @@ -2787,16 +2834,11 @@ static int filter_connect(const struct lu_env *env, GOTO(cleanup, rc); } - group = data->ocd_group; - - CWARN("%s: Received MDS connection ("LPX64"); group %d\n", - obd->obd_name, lexp->exp_handle.h_cookie, group); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = filter_read_groups(obd, group, 1); + rc = filter_read_groups(obd, data->ocd_group, 1); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc != 0) { - CERROR("can't read group %u\n", group); + CERROR("can't read group %u\n", data->ocd_group); GOTO(cleanup, rc); } @@ -2805,7 +2847,6 @@ static int filter_connect(const struct lu_env *env, cleanup: if (rc) { class_disconnect(lexp); - lprocfs_exp_cleanup(lexp); *exp = NULL; } else { *exp = lexp; @@ -3189,13 +3230,15 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, *fcc = oa->o_lcookie; } if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { - DQUOT_INIT(inode); + unsigned long now = jiffies; + ll_vfs_dq_init(inode); /* Filter truncates and writes are serialized by * i_alloc_sem, see the comment in * filter_preprw_write.*/ if (ia_valid & ATTR_SIZE) down_write(&inode->i_alloc_sem); LOCK_INODE_MUTEX(inode); + fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); old_size = i_size_read(inode); } @@ -3277,7 +3320,10 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, * we have two left for the last_rcvd and VBR inode version updates. */ err = fsfilt_extend(exp->exp_obd, inode, 2, handle); - rc = filter_finish_transno(exp, inode, oti, rc, sync); + /* Update inode version only if data has changed => size has changed */ + rc = filter_finish_transno(exp, ia_valid & ATTR_SIZE ? inode : NULL, + oti, rc, sync); + if (sync) { filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc); fcc = NULL; @@ -3392,7 +3438,9 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, */ if (oa->o_valid & (OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME)) { + unsigned long now = jiffies; down_write(&dentry->d_inode->i_alloc_sem); + fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem"); fmd = filter_fmd_get(exp, oa->o_id, oa->o_seq); if (fmd && fmd->fmd_mactime_xid < oti->oti_xid) fmd->fmd_mactime_xid = oti->oti_xid; @@ -3479,7 +3527,7 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, LASSERT((*lsmp)->lsm_object_id); } - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + (*lsmp)->lsm_maxbytes = exp->exp_obd->u.obt.obt_sb->s_maxbytes; RETURN(lsm_size); } @@ -3538,7 +3586,14 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, filter_set_last_id(filter, id, doa.o_seq); rc = filter_update_last_objid(exp->exp_obd, doa.o_seq, 1); } else { - /* don't reuse orphan object, return last used objid */ + /* + * We have destroyed orphan objects, but don't want to reuse + * them. Therefore we don't reset last_id to the last created + * objects. Instead, we report back to the MDS the object id + * of the last orphan, so that the MDS can restart allocating + * objects from this id + 1 and thus skip the whole orphan + * object id range + */ oa->o_id = last; rc = 0; } @@ -3663,11 +3718,10 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, struct lr_server_data *lsd = class_server_data(obd); int index = le32_to_cpu(lsd->lsd_ost_index); - if (obd_fail_val == -1 || - index == obd_fail_val) + if (cfs_fail_val == -1 || index == cfs_fail_val) osfs->os_bfree = osfs->os_bavail = 2; - else if (obd_fail_loc & OBD_FAIL_ONCE) - obd_fail_loc &= ~OBD_FAILED; /* reset flag */ + else if (cfs_fail_loc & OBD_FAIL_ONCE) + cfs_fail_loc &= ~OBD_FAILED; /* reset flag */ } /* set EROFS to state field if FS is mounted as RDONLY. The goal is to @@ -3738,6 +3792,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, struct dentry *dchild = NULL, *dparent = NULL; struct filter_obd *filter; struct obd_statfs *osfs; + struct iattr iattr; int err = 0, rc = 0, recreate_obj = 0, i; cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT/2); __u64 os_ffree; @@ -3758,13 +3813,21 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, OBD_ALLOC(osfs, sizeof(*osfs)); if (osfs == NULL) RETURN(-ENOMEM); - rc = filter_statfs(obd, osfs, cfs_time_current_64() - CFS_HZ, + rc = filter_statfs(obd, osfs, + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), 0); if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { CDEBUG(D_RPCTRACE,"%s: not enough space for create " LPU64"\n", obd->obd_name, osfs->os_bavail << obd->u.obt.obt_vfsmnt->mnt_sb->s_blocksize_bits); *num = 0; + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NOSPC_BLK; + else { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = OBD_FL_NOSPC_BLK; + } + rc = -ENOSPC; } OBD_FREE(osfs, sizeof(*osfs)); @@ -3798,10 +3861,15 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, } else next_id = filter_last_id(filter, group) + 1; - /* Temporary solution for oid in CMD before fid-on-OST */ - if ((fid_seq_is_mdt0(oa->o_seq) && next_id >= IDIF_MAX_OID) && - (fid_seq_is_cmd(oa->o_seq) && next_id >= OBIF_MAX_OID)) { - CERROR("%s:"POSTID" hit the max IDIF_MAX_OID(1<<48)!\n", + /* Don't create objects beyond the valid range for this SEQ */ + if (unlikely(fid_seq_is_mdt0(group) && + next_id >= IDIF_MAX_OID)) { + CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n", + obd->obd_name, next_id, group); + GOTO(cleanup, rc = -ENOSPC); + } else if (unlikely(!fid_seq_is_mdt0(group) && + next_id >= OBIF_MAX_OID)) { + CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n", obd->obd_name, next_id, group); GOTO(cleanup, rc = -ENOSPC); } @@ -3864,9 +3932,21 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, CERROR("create failed rc = %d\n", rc); if (rc == -ENOSPC) { os_ffree = filter_calc_free_inodes(obd); - if (os_ffree != -1) + if (os_ffree == -1) + GOTO(cleanup, rc); + + if (obd->obd_osfs.os_bavail < + (obd->obd_osfs.os_blocks >> 10)) { + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NOSPC_BLK; + else { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = OBD_FL_NOSPC_BLK; + } + CERROR("%s: free inode "LPU64"\n", obd->obd_name, os_ffree); + } } GOTO(cleanup, rc); } @@ -3876,6 +3956,19 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, dchild->d_inode->i_ino); set_last_id: + /* Set a/c/m time to a insane large negative value at creation + * time so that any timestamp arriving from the client will + * always be newer and update the inode. + * See LU-221 for details */ + iattr.ia_valid = ATTR_ATIME | ATTR_MTIME | ATTR_CTIME; + LTIME_S(iattr.ia_atime) = INT_MIN + 24 * 3600; + LTIME_S(iattr.ia_mtime) = INT_MIN + 24 * 3600; + LTIME_S(iattr.ia_ctime) = INT_MIN + 24 * 3600; + err = fsfilt_setattr(obd, dchild, handle, &iattr, 0); + if (err) + CERROR("unable to initialize a/c/m time of newly" + "created inode\n"); + if (!recreate_obj) { filter_set_last_id(filter, next_id, group); err = filter_update_last_objid(obd, group, 0); @@ -3904,6 +3997,7 @@ set_last_id: if (rc) break; if (cfs_time_after(jiffies, enough_time)) { + i++; CDEBUG(D_RPCTRACE, "%s: precreate slow - want %d got %d \n", obd->obd_name, *num, i); @@ -3919,8 +4013,8 @@ set_last_id: RETURN(rc); } -static int filter_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +int filter_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; struct filter_export_data *fed; @@ -3936,7 +4030,17 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, fed = &exp->exp_filter_data; filter = &obd->u.filter; - if (fed->fed_group != oa->o_seq) { + /* 1.8 client doesn't carry the ocd_group with connect request, + * so the fed_group will always be zero for 1.8 client. */ + if (!(exp->exp_connect_flags & OBD_CONNECT_FULL20)) { + if (oa->o_seq != FID_SEQ_OST_MDT0 && + oa->o_seq != FID_SEQ_LLOG && + oa->o_seq != FID_SEQ_ECHO) { + CERROR("The request from older client has invalid" + " group "LPU64"!\n", oa->o_seq); + RETURN(-EINVAL); + } + } else if (fed->fed_group != oa->o_seq) { CERROR("%s: this export (nid %s) used object group %d " "earlier; now it's trying to use group "LPU64"!" " This could be a bug in the MDS. Please report to " @@ -3959,7 +4063,8 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { - if (oa->o_id > filter_last_id(filter, oa->o_seq)) { + if (!obd->obd_recovering || + oa->o_id > filter_last_id(filter, oa->o_seq)) { CERROR("recreate objid "LPU64" > last id "LPU64"\n", oa->o_id, filter_last_id(filter, oa->o_seq)); rc = -EINVAL; @@ -4002,6 +4107,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, struct llog_cookie *fcc = NULL; int rc, rc2, cleanup_phase = 0, sync = 0; struct iattr iattr; + unsigned long now; ENTRY; rc = filter_auth_capa(exp, NULL, oa->o_seq, @@ -4056,7 +4162,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, if (fcc != NULL) *fcc = oa->o_lcookie; } - DQUOT_INIT(dchild->d_inode); + ll_vfs_dq_init(dchild->d_inode); /* we're gonna truncate it first in order to avoid possible deadlock: * P1 P2 @@ -4070,8 +4176,10 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, * between page lock, i_mutex & starting new journal handle. * (see bug 20321) -johann */ + now = jiffies; down_write(&dchild->d_inode->i_alloc_sem); LOCK_INODE_MUTEX(dchild->d_inode); + fsfilt_check_slow(exp->exp_obd, now, "i_alloc_sem and i_mutex"); /* VBR: version recovery check */ rc = filter_version_get_check(exp, oti, dchild->d_inode); @@ -4203,9 +4311,9 @@ static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } -static int filter_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_off start, obd_off end, - void *capa) +static int filter_sync(struct obd_export *exp, struct obd_info *oinfo, + obd_off start, obd_off end, + struct ptlrpc_request_set *set) { struct lvfs_run_ctxt saved; struct obd_device_target *obt; @@ -4213,22 +4321,23 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, int rc, rc2; ENTRY; - rc = filter_auth_capa(exp, NULL, oa->o_seq, - (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE); + rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_seq, + (struct lustre_capa *)oinfo->oi_capa, + CAPA_OPC_OSS_WRITE); if (rc) RETURN(rc); obt = &exp->exp_obd->u.obt; /* An objid of zero is taken to mean "sync whole filesystem" */ - if (!oa || !(oa->o_valid & OBD_MD_FLID)) { + if (!oinfo->oi_oa || !(oinfo->oi_oa->o_valid & OBD_MD_FLID)) { rc = fsfilt_sync(exp->exp_obd, obt->obt_sb); /* Flush any remaining cancel messages out to the target */ filter_sync_llogs(exp->exp_obd, exp); RETURN(rc); } - dentry = filter_oa2dentry(exp->exp_obd, &oa->o_oi); + dentry = filter_oa2dentry(exp->exp_obd, &oinfo->oi_oa->o_oi); if (IS_ERR(dentry)) RETURN(PTR_ERR(dentry)); @@ -4250,8 +4359,9 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, } UNLOCK_INODE_MUTEX(dentry->d_inode); - oa->o_valid = OBD_MD_FLID; - obdo_from_inode(oa, dentry->d_inode, NULL, FILTER_VALID_FLAGS); + oinfo->oi_oa->o_valid = OBD_MD_FLID; + obdo_from_inode(oinfo->oi_oa, dentry->d_inode, NULL, + FILTER_VALID_FLAGS); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); @@ -4335,6 +4445,12 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen, RETURN(rc); } + if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { + *((__u32 *) val) = obd->u.filter.fo_sync_lock_cancel; + *vallen = sizeof(__u32); + RETURN(0); + } + CDEBUG(D_IOCTL, "invalid key\n"); RETURN(-EINVAL); } @@ -4567,6 +4683,24 @@ static int filter_process_config(struct obd_device *obd, obd_count len, return rc; } +static int filter_notify(struct obd_device *obd, + struct obd_device *unused, + enum obd_notify_event ev, void *data) +{ + switch (ev) { + case OBD_NOTIFY_CONFIG: + LASSERT(obd->obd_no_conn); + cfs_spin_lock(&obd->obd_dev_lock); + obd->obd_no_conn = 0; + cfs_spin_unlock(&obd->obd_dev_lock); + break; + default: + CDEBUG(D_INFO, "%s: Unhandled notification %#x\n", + obd->obd_name, ev); + } + return 0; +} + static struct lvfs_callback_ops filter_lvfs_ops = { l_fid2dentry: filter_lvfs_fid2dentry, }; @@ -4601,6 +4735,7 @@ static struct obd_ops filter_obd_ops = { .o_iocontrol = filter_iocontrol, .o_health_check = filter_health_check, .o_process_config = filter_process_config, + .o_notify = filter_notify, }; quota_interface_t *filter_quota_interface_ref;