X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fllite_lib.c;h=83a131eb3f86d8cf18dfa92a9f68d51ef4159175;hp=43655f8ccd13ffbed1f51392889114b0ec82a550;hb=138544d2c4b5332902d8141c0d119074123e072b;hpb=e4488872e015ae85122e3095e4180c1dfd40c596 diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 43655f8..83a131e 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1,24 +1,41 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Lustre Light Super operations + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * This file is part of Lustre, http://www.lustre.org. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lustre/llite/llite_lib.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Lustre Light Super operations */ #define DEBUG_SUBSYSTEM S_LLITE @@ -27,20 +44,23 @@ #include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "llite_internal.h" -kmem_cache_t *ll_file_data_slab; -kmem_cache_t *ll_intent_slab; +cfs_mem_cache_t *ll_file_data_slab; + +LIST_HEAD(ll_super_blocks); +spinlock_t ll_sb_lock = SPIN_LOCK_UNLOCKED; extern struct address_space_operations ll_aops; extern struct address_space_operations ll_dir_aops; @@ -49,10 +69,13 @@ extern struct address_space_operations ll_dir_aops; #define log2(n) ffz(~(n)) #endif -struct ll_sb_info *lustre_init_sbi(struct super_block *sb) +static struct ll_sb_info *ll_init_sbi(void) { struct ll_sb_info *sbi = NULL; + unsigned long pages; + struct sysinfo si; class_uuid_t uuid; + int i; ENTRY; OBD_ALLOC(sbi, sizeof(*sbi)); @@ -60,433 +83,654 @@ struct ll_sb_info *lustre_init_sbi(struct super_block *sb) RETURN(NULL); spin_lock_init(&sbi->ll_lock); - INIT_LIST_HEAD(&sbi->ll_pglist); - sbi->ll_pglist_gen = 0; - if (num_physpages < SBI_DEFAULT_RA_MAX / 4) - sbi->ll_ra_info.ra_max_pages = num_physpages / 4; - else - sbi->ll_ra_info.ra_max_pages = SBI_DEFAULT_RA_MAX; + spin_lock_init(&sbi->ll_lco.lco_lock); + spin_lock_init(&sbi->ll_pp_extent_lock); + spin_lock_init(&sbi->ll_process_lock); + sbi->ll_rw_stats_on = 0; + + si_meminfo(&si); + pages = si.totalram - si.totalhigh; + if (pages >> (20 - CFS_PAGE_SHIFT) < 512) { +#ifdef HAVE_BGL_SUPPORT + sbi->ll_async_page_max = pages / 4; +#else + sbi->ll_async_page_max = pages / 2; +#endif + } else { + sbi->ll_async_page_max = (pages / 4) * 3; + } + + sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32, + SBI_DEFAULT_READAHEAD_MAX); + sbi->ll_ra_info.ra_max_pages = sbi->ll_ra_info.ra_max_pages_per_file; + sbi->ll_ra_info.ra_max_read_ahead_whole_pages = + SBI_DEFAULT_READAHEAD_WHOLE_MAX; INIT_LIST_HEAD(&sbi->ll_conn_chain); - INIT_HLIST_HEAD(&sbi->ll_orphan_dentry_list); - INIT_LIST_HEAD(&sbi->ll_mnt_list); - - sema_init(&sbi->ll_gns_sem, 1); - spin_lock_init(&sbi->ll_gns_lock); - INIT_LIST_HEAD(&sbi->ll_gns_sbi_head); - init_waitqueue_head(&sbi->ll_gns_waitq); - init_completion(&sbi->ll_gns_mount_finished); - - /* this later may be reset via /proc/fs/... */ - memcpy(sbi->ll_gns_oname, ".mntinfo", strlen(".mntinfo")); - sbi->ll_gns_oname[strlen(sbi->ll_gns_oname)] = '\0'; - - /* this later may be reset via /proc/fs/... */ - memcpy(sbi->ll_gns_upcall, "/usr/sbin/gns_upcall", - strlen("/usr/sbin/gns_upcall")); - sbi->ll_gns_upcall[strlen(sbi->ll_gns_upcall)] = '\0'; - - /* default values, may be changed via /proc/fs/... */ - sbi->ll_gns_state = LL_GNS_IDLE; - sbi->ll_gns_pending_dentry = NULL; - atomic_set(&sbi->ll_gns_enabled, 1); - sbi->ll_gns_tick = GNS_TICK_TIMEOUT; - sbi->ll_gns_timeout = GNS_MOUNT_TIMEOUT; - - sbi->ll_gns_timer.data = (unsigned long)sbi; - sbi->ll_gns_timer.function = ll_gns_timer_callback; - init_timer(&sbi->ll_gns_timer); - //audit mask - sbi->ll_audit_mask = AUDIT_OFF; - ll_set_sbi(sb, sbi); - - generate_random_uuid(uuid); + INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list); + + ll_generate_random_uuid(uuid); class_uuid_unparse(uuid, &sbi->ll_sb_uuid); + CDEBUG(D_CONFIG, "generated uuid: %s\n", sbi->ll_sb_uuid.uuid); + + spin_lock(&ll_sb_lock); + list_add_tail(&sbi->ll_list, &ll_super_blocks); + spin_unlock(&ll_sb_lock); + +#ifdef ENABLE_LLITE_CHECKSUM + sbi->ll_flags |= LL_SBI_CHECKSUM; +#endif + +#ifdef HAVE_LRU_RESIZE_SUPPORT + sbi->ll_flags |= LL_SBI_LRU_RESIZE; +#endif + +#ifdef HAVE_EXPORT___IGET + INIT_LIST_HEAD(&sbi->ll_deathrow); + spin_lock_init(&sbi->ll_deathrow_lock); +#endif + for (i = 0; i <= LL_PROCESS_HIST_MAX; i++) { + spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_r_hist.oh_lock); + spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock); + } + + /* metadata statahead is enabled by default */ + sbi->ll_sa_max = LL_SA_RPC_DEF; + RETURN(sbi); } -void lustre_free_sbi(struct super_block *sb) +void ll_free_sbi(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); ENTRY; if (sbi != NULL) { - list_del(&sbi->ll_gns_sbi_head); - del_timer(&sbi->ll_gns_timer); + spin_lock(&ll_sb_lock); + list_del(&sbi->ll_list); + spin_unlock(&ll_sb_lock); OBD_FREE(sbi, sizeof(*sbi)); } - ll_set_sbi(sb, NULL); EXIT; } -int lustre_init_dt_desc(struct ll_sb_info *sbi) -{ - __u32 valsize; - int rc = 0; - ENTRY; - - valsize = sizeof(sbi->ll_dt_desc); - memset(&sbi->ll_dt_desc, 0, sizeof(sbi->ll_dt_desc)); - rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, - "lovdesc", &valsize, &sbi->ll_dt_desc); - RETURN(rc); -} +static struct dentry_operations ll_d_root_ops = { +#ifdef DCACHE_LUSTRE_INVALID + .d_compare = ll_dcompare, +#endif + .d_revalidate = ll_revalidate_nd, +}; -static int lustre_connect_mds(struct super_block *sb, char *lmv, - struct obd_connect_data *data, - char *mds_security, int async, int pag) +static int client_common_fill_super(struct super_block *sb, char *md, char *dt) { + struct inode *root = 0; struct ll_sb_info *sbi = ll_s2sbi(sb); - struct lustre_handle md_conn = {0, }; - struct obd_device *md_obd; + struct obd_device *obd; + struct obd_capa *oc = NULL; struct obd_statfs osfs; - unsigned long sec_flags; - __u32 valsize; - int err = 0; - ENTRY; - - md_obd = class_name2obd(lmv); - if (!md_obd) { - CERROR("MDC %s: not setup or attached\n", lmv); + struct ptlrpc_request *request = NULL; + struct obd_connect_data *data = NULL; + struct obd_uuid *uuid; + struct lustre_md lmd; + obd_valid valid; + int size, err, checksum; + ENTRY; + + obd = class_name2obd(md); + if (!obd) { + CERROR("MD %s: not setup or attached\n", md); RETURN(-EINVAL); } - obd_set_info(md_obd->obd_self_export, strlen("async"), "async", - sizeof(async), &async); + OBD_ALLOC_PTR(data); + if (data == NULL) + RETURN(-ENOMEM); - if (mds_security == NULL) - mds_security = "null"; - - err = obd_set_info(md_obd->obd_self_export, strlen("sec"), "sec", - strlen(mds_security), mds_security); - - if (err) { - CERROR("LMV %s: failed to set security %s, err %d\n", - lmv, mds_security, err); - RETURN(err); + if (proc_lustre_fs_root) { + err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb, + dt, md); + if (err < 0) + CERROR("could not register mount in /proc/fs/lustre\n"); } - if (pag) { - sec_flags = PTLRPC_SEC_FL_PAG; - err = obd_set_info(md_obd->obd_self_export, - strlen("sec_flags"), "sec_flags", - sizeof(sec_flags), &sec_flags); - if (err) { - OBD_FREE(data, sizeof(*data)); - RETURN(err); - } - } + /* indicate the features supported by this client */ + data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | + OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID | + OBD_CONNECT_VERSION | OBD_CONNECT_MDS_CAPA | + OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET| + OBD_CONNECT_FID | OBD_CONNECT_AT | + OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT | + OBD_CONNECT_VBR | OBD_CONNECT_SOM; + +#ifdef HAVE_LRU_RESIZE_SUPPORT + if (sbi->ll_flags & LL_SBI_LRU_RESIZE) + data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; +#endif +#ifdef CONFIG_FS_POSIX_ACL + data->ocd_connect_flags |= OBD_CONNECT_ACL; +#endif + data->ocd_ibits_known = MDS_INODELOCK_FULL; + data->ocd_version = LUSTRE_VERSION_CODE; + + if (sb->s_flags & MS_RDONLY) + data->ocd_connect_flags |= OBD_CONNECT_RDONLY; + if (sbi->ll_flags & LL_SBI_USER_XATTR) + data->ocd_connect_flags |= OBD_CONNECT_XATTR; + +#ifdef HAVE_MS_FLOCK_LOCK + /* force vfs to use lustre handler for flock() calls - bug 10743 */ + sb->s_flags |= MS_FLOCK_LOCK; +#endif + + if (sbi->ll_flags & LL_SBI_FLOCK) + sbi->ll_fop = &ll_file_operations_flock; + else if (sbi->ll_flags & LL_SBI_LOCALFLOCK) + sbi->ll_fop = &ll_file_operations; + else + sbi->ll_fop = &ll_file_operations_noflock; + + /* real client */ + data->ocd_connect_flags |= OBD_CONNECT_REAL; + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) + data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT_FORCE; - err = obd_connect(&md_conn, md_obd, &sbi->ll_sb_uuid, data, - OBD_OPT_REAL_CLIENT); + err = obd_connect(NULL, &sbi->ll_md_exp, obd, &sbi->ll_sb_uuid, data, NULL); if (err == -EBUSY) { - CERROR("An MDS (lmv %s) is performing recovery, of which this" - " client is not a part. Please wait for recovery to " - "complete, abort, or time out.\n", lmv); + LCONSOLE_ERROR_MSG(0x14f, "An MDT (md %s) is performing " + "recovery, of which this client is not a " + "part. Please wait for recovery to complete," + " abort, or time out.\n", md); GOTO(out, err); } else if (err) { - CERROR("cannot connect to %s: rc = %d\n", lmv, err); + CERROR("cannot connect to %s: rc = %d\n", md, err); GOTO(out, err); } - sbi->ll_md_exp = class_conn2export(&md_conn); - - err = obd_statfs(md_obd, &osfs, jiffies - HZ); + err = obd_fid_init(sbi->ll_md_exp); + if (err) { + CERROR("Can't init metadata layer FID infrastructure, " + "rc %d\n", err); + GOTO(out_md, err); + } + + err = obd_statfs(obd, &osfs, cfs_time_current_64() - HZ, 0); if (err) - GOTO(out_disconnect, err); + GOTO(out_md_fid, err); - if (!osfs.os_bsize) { - CERROR("Invalid block size is detected."); - GOTO(out_disconnect, err); + size = sizeof(*data); + err = obd_get_info(sbi->ll_md_exp, sizeof(KEY_CONN_DATA), + KEY_CONN_DATA, &size, data, NULL); + if (err) { + CERROR("Get connect data failed: %d \n", err); + GOTO(out_md, err); } - sb->s_magic = LL_SUPER_MAGIC; + LASSERT(osfs.os_bsize); sb->s_blocksize = osfs.os_bsize; sb->s_blocksize_bits = log2(osfs.os_bsize); + sb->s_magic = LL_SUPER_MAGIC; + + /* for bug 11559. in $LINUX/fs/read_write.c, function do_sendfile(): + * retval = in_file->f_op->sendfile(...); + * if (*ppos > max) + * retval = -EOVERFLOW; + * + * it will check if *ppos is greater than max. However, max equals to + * s_maxbytes, which is a negative integer in a x86_64 box since loff_t + * has been defined as a signed long long ineger in linux kernel. */ +#if BITS_PER_LONG == 64 + sb->s_maxbytes = PAGE_CACHE_MAXBYTES >> 1; +#else sb->s_maxbytes = PAGE_CACHE_MAXBYTES; +#endif + sbi->ll_namelen = osfs.os_namelen; + sbi->ll_max_rw_chunk = LL_DEFAULT_MAX_RW_CHUNK; - /* in 2.6.x FS is not allowed to form s_dev */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - { - kdev_t devno; - - devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, - strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid)); - - sb->s_dev = devno; + if ((sbi->ll_flags & LL_SBI_USER_XATTR) && + !(data->ocd_connect_flags & OBD_CONNECT_XATTR)) { + LCONSOLE_INFO("Disabling user_xattr feature because " + "it is not supported on the server\n"); + sbi->ll_flags &= ~LL_SBI_USER_XATTR; } -#endif - /* after statfs, we are supposed to have connected to MDSs, - * so it's ok to check remote flag returned. - */ - valsize = sizeof(&sbi->ll_remote); - err = obd_get_info(sbi->ll_md_exp, strlen("remote_flag"), "remote_flag", - &valsize, &sbi->ll_remote); - if (err) { - CERROR("fail to obtain remote flag\n"); - GOTO(out_disconnect, err); + if (data->ocd_connect_flags & OBD_CONNECT_ACL) { +#ifdef MS_POSIXACL + sb->s_flags |= MS_POSIXACL; +#endif + sbi->ll_flags |= LL_SBI_ACL; + } else { + LCONSOLE_INFO("client wants to enable acl, but mdt not!\n"); +#ifdef MS_POSIXACL + sb->s_flags &= ~MS_POSIXACL; +#endif + sbi->ll_flags &= ~LL_SBI_ACL; } -out_disconnect: - if (err) - obd_disconnect(sbi->ll_md_exp, 0); -out: - RETURN(err); -} + if (data->ocd_connect_flags & OBD_CONNECT_JOIN) + sbi->ll_flags |= LL_SBI_JOIN; -static int lustre_connect_ost(struct super_block *sb, char *lov, - struct obd_connect_data *data, - char *oss_security, int async, int pag) -{ - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct lustre_handle dt_conn = {0, }; - struct obd_device *obd = NULL; - unsigned long sec_flags; - int err, mdsize; + if (data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) { + if (!(sbi->ll_flags & LL_SBI_RMT_CLIENT)) { + sbi->ll_flags |= LL_SBI_RMT_CLIENT; + LCONSOLE_INFO("client is set as remote by default.\n"); + } + } else { + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { + sbi->ll_flags &= ~LL_SBI_RMT_CLIENT; + LCONSOLE_INFO("client claims to be remote, but server " + "rejected, forced to be local.\n"); + } + } - obd = class_name2obd(lov); - if (!obd) { - CERROR("OSC %s: not setup or attached\n", lov); - GOTO(out, err = -EINVAL); + if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) { + LCONSOLE_INFO("client enabled MDS capability!\n"); + sbi->ll_flags |= LL_SBI_MDS_CAPA; } - obd_set_info(obd->obd_self_export, strlen("async"), "async", - sizeof(async), &async); - if (oss_security == NULL) - oss_security = "null"; + if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) { + LCONSOLE_INFO("client enabled OSS capability!\n"); + sbi->ll_flags |= LL_SBI_OSS_CAPA; + } - err = obd_set_info(obd->obd_self_export, strlen("sec"), "sec", - strlen(oss_security), oss_security); - if (err) { - CERROR("LOV %s: failed to set security %s, err %d\n", - lov, oss_security, err); - RETURN(err); + obd = class_name2obd(dt); + if (!obd) { + CERROR("DT %s: not setup or attached\n", dt); + GOTO(out_md_fid, err = -ENODEV); + } + + data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | + OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | + OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK| + OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT | + OBD_CONNECT_OSS_CAPA | OBD_CONNECT_VBR| + OBD_CONNECT_SOM; + + if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) { + /* OBD_CONNECT_CKSUM should always be set, even if checksums are + * disabled by default, because it can still be enabled on the + * fly via /proc. As a consequence, we still need to come to an + * agreement on the supported algorithms at connect time */ + data->ocd_connect_flags |= OBD_CONNECT_CKSUM; + + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_CKSUM_ADLER_ONLY)) + data->ocd_cksum_types = OBD_CKSUM_ADLER; + else + /* send the list of supported checksum types */ + data->ocd_cksum_types = OBD_CKSUM_ALL; } - /* FIXME Because of the async nature of file i/o, we never know - * who is actually dirty the pages; and any process have chance - * to trigger dirty-flushing within its own process context. So - * for simplicity we simply use root's credential, we suppose root - * always have credential. - */ - if (pag) - sec_flags = PTLRPC_SEC_FL_PAG; - else - sec_flags = PTLRPC_SEC_FL_OSS; +#ifdef HAVE_LRU_RESIZE_SUPPORT + data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; +#endif + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) + data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT_FORCE; - err = obd_set_info(obd->obd_self_export, - strlen("sec_flags"), "sec_flags", - sizeof(sec_flags), &sec_flags); - if (err) { - OBD_FREE(data, sizeof(*data)); - RETURN(err); - } + CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d " + "ocd_grant: %d\n", data->ocd_connect_flags, + data->ocd_version, data->ocd_grant); - err = obd_connect(&dt_conn, obd, &sbi->ll_sb_uuid, data, 0); + obd->obd_upcall.onu_owner = &sbi->ll_lco; + obd->obd_upcall.onu_upcall = cl_ocd_update; + data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT; + + err = obd_connect(NULL, &sbi->ll_dt_exp, obd, &sbi->ll_sb_uuid, data, NULL); if (err == -EBUSY) { - CERROR("An OST (lov %s) is performing recovery, of which this" - " client is not a part. Please wait for recovery to " - "complete, abort, or time out.\n", lov); - GOTO(out, err); + LCONSOLE_ERROR_MSG(0x150, "An OST (dt %s) is performing " + "recovery, of which this client is not a " + "part. Please wait for recovery to " + "complete, abort, or time out.\n", dt); + GOTO(out_md_fid, err); } else if (err) { - CERROR("cannot connect to %s: rc = %d\n", lov, err); - GOTO(out, err); + CERROR("Cannot connect to %s: rc = %d\n", dt, err); + GOTO(out_md_fid, err); } - sbi->ll_dt_exp = class_conn2export(&dt_conn); - err = lustre_init_dt_desc(sbi); - + err = obd_fid_init(sbi->ll_dt_exp); if (err) { - CWARN("init dt_desc error %d \n", err); - GOTO(out, err = 0); + CERROR("Can't init data layer FID infrastructure, " + "rc %d\n", err); + GOTO(out_dt, err); } - mdsize = obd_size_diskmd(sbi->ll_dt_exp, NULL); - obd_init_ea_size(sbi->ll_md_exp, mdsize, sbi->ll_dt_desc.ld_tgt_count * - sizeof(struct llog_cookie)); -out: - RETURN(err); -} -extern struct dentry_operations ll_d_ops; - -static int lustre_init_root_inode(struct super_block *sb) -{ - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct ptlrpc_request *request = NULL; - struct inode *root = NULL; - struct lustre_md md; - int err = 0; - ENTRY; + spin_lock(&sbi->ll_lco.lco_lock); + sbi->ll_lco.lco_flags = data->ocd_connect_flags; + sbi->ll_lco.lco_md_exp = sbi->ll_md_exp; + sbi->ll_lco.lco_dt_exp = sbi->ll_dt_exp; + spin_unlock(&sbi->ll_lco.lco_lock); - err = md_getstatus(sbi->ll_md_exp, &sbi->ll_rootid); + fid_zero(&sbi->ll_root_fid); + err = md_getstatus(sbi->ll_md_exp, &sbi->ll_root_fid, &oc); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out, err); + GOTO(out_lock_cn_cb, err); + } + if (!fid_is_sane(&sbi->ll_root_fid)) { + CERROR("Invalid root fid during mount\n"); + GOTO(out_lock_cn_cb, err = -EINVAL); } - CDEBUG(D_SUPER, "rootid "DLID4"\n", OLID4(&sbi->ll_rootid)); + CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&sbi->ll_root_fid)); sb->s_op = &lustre_super_operations; - - /* make root inode */ - err = md_getattr(sbi->ll_md_exp, &sbi->ll_rootid, - (OBD_MD_FLNOTOBD | OBD_MD_FLBLOCKS | OBD_MD_FID), - NULL, NULL, 0, 0, NULL, &request); + sb->s_export_op = &lustre_export_operations; + + /* make root inode + * XXX: move this to after cbd setup? */ + valid = OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS | OBD_MD_FLMDSCAPA; + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) + valid |= OBD_MD_FLRMTPERM; + else if (sbi->ll_flags & LL_SBI_ACL) + valid |= OBD_MD_FLACL; + + err = md_getattr(sbi->ll_md_exp, &sbi->ll_root_fid, oc, valid, 0, + &request); + if (oc) + capa_put(oc); if (err) { CERROR("md_getattr failed for root: rc = %d\n", err); - GOTO(out, err); + GOTO(out_lock_cn_cb, err); } - - err = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, - sbi->ll_dt_exp, &md); + memset(&lmd, 0, sizeof(lmd)); + err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp, + sbi->ll_md_exp, &lmd); if (err) { CERROR("failed to understand root inode md: rc = %d\n", err); - ptlrpc_req_finished(request); - GOTO(out, err); + ptlrpc_req_finished (request); + GOTO(out_lock_cn_cb, err); } - LASSERT(id_ino(&sbi->ll_rootid) != 0); - root = ll_iget(sb, id_ino(&sbi->ll_rootid), &md); - + LASSERT(fid_is_sane(&sbi->ll_root_fid)); + root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &lmd); + md_free_lustre_md(sbi->ll_md_exp, &lmd); ptlrpc_req_finished(request); - if (root == NULL || is_bad_inode(root)) { - if (md.lsm != NULL) - obd_free_memmd(sbi->ll_dt_exp, &md.lsm); - if (md.mea != NULL) - obd_free_memmd(sbi->ll_md_exp, - (struct lov_stripe_md**)&md.mea); + if (root == NULL || IS_ERR(root)) { + if (lmd.lsm) + obd_free_memmd(sbi->ll_dt_exp, &lmd.lsm); +#ifdef CONFIG_FS_POSIX_ACL + if (lmd.posix_acl) { + posix_acl_release(lmd.posix_acl); + lmd.posix_acl = NULL; + } +#endif + err = IS_ERR(root) ? PTR_ERR(root) : -EBADF; + root = NULL; CERROR("lustre_lite: bad iget4 for root\n"); - GOTO(out_root, err = -EBADF); + GOTO(out_root, err); + } + + err = ll_close_thread_start(&sbi->ll_lcq); + if (err) { + CERROR("cannot start close thread: rc %d\n", err); + GOTO(out_root, err); + } + +#ifdef CONFIG_FS_POSIX_ACL + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { + rct_init(&sbi->ll_rct); + et_init(&sbi->ll_et); } +#endif + + checksum = sbi->ll_flags & LL_SBI_CHECKSUM; + err = obd_set_info_async(sbi->ll_dt_exp, sizeof(KEY_CHECKSUM), + KEY_CHECKSUM, sizeof(checksum), &checksum, + NULL); + cl_sb_init(sb); + sb->s_root = d_alloc_root(root); - sb->s_root->d_op = &ll_d_ops; + if (data != NULL) + OBD_FREE(data, sizeof(*data)); + + sb->s_root->d_op = &ll_d_root_ops; + + sbi->ll_sdev_orig = sb->s_dev; + + /* We set sb->s_dev equal on all lustre clients in order to support + * NFS export clustering. NFSD requires that the FSID be the same + * on all clients. */ + /* s_dev is also used in lt_compare() to compare two fs, but that is + * only a node-local comparison. */ + uuid = obd_get_uuid(sbi->ll_md_exp); + if (uuid != NULL) + sb->s_dev = get_uuid2int(uuid->uuid, strlen(uuid->uuid)); + + RETURN(err); out_root: - if (err) + if (root) iput(root); +out_lock_cn_cb: + obd_fid_fini(sbi->ll_dt_exp); +out_dt: + obd_disconnect(sbi->ll_dt_exp); + sbi->ll_dt_exp = NULL; +out_md_fid: + obd_fid_fini(sbi->ll_md_exp); +out_md: + obd_disconnect(sbi->ll_md_exp); + sbi->ll_md_exp = NULL; out: - RETURN(err); + if (data != NULL) + OBD_FREE_PTR(data); + lprocfs_unregister_mountpoint(sbi); + return err; } -int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov, - char *gkc, int async, char *mds_security, - char *oss_security, __u32 *nllu, int pag, - __u64 *remote) +int ll_get_max_mdsize(struct ll_sb_info *sbi, int *lmmsize) { - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_connect_data *data; - int err; - ENTRY; + int size, rc; - /*process the connect flags*/ - if ((*remote & (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) == - (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) { - CERROR("wrong remote flag "LPX64"\n", *remote); - RETURN(-EINVAL); - } + *lmmsize = obd_size_diskmd(sbi->ll_dt_exp, NULL); + size = sizeof(int); + rc = obd_get_info(sbi->ll_md_exp, sizeof(KEY_MAX_EASIZE), + KEY_MAX_EASIZE, &size, lmmsize, NULL); + if (rc) + CERROR("Get max mdsize error rc %d \n", rc); - OBD_ALLOC(data, sizeof(*data)); - if (!data) - RETURN(-ENOMEM); + RETURN(rc); +} + +void ll_dump_inode(struct inode *inode) +{ + struct list_head *tmp; + struct dentry *dentry; + int dentry_count = 0; - data->ocd_connect_flags |= *remote & (OBD_CONNECT_LOCAL | - OBD_CONNECT_REMOTE); - memcpy(data->ocd_nllu, nllu, sizeof(data->ocd_nllu)); + LASSERT(inode != NULL); + CERROR("inode %p dump: dev=%s ino=%lu mode=%o count=%u state %lx\n", + inode, ll_i2mdexp(inode)->exp_obd->obd_name, inode->i_ino, + inode->i_mode, atomic_read(&inode->i_count), inode->i_state); - if (proc_lustre_fs_root) { - err = lprocfs_register_mountpoint(proc_lustre_fs_root, - sb, lov, lmv); - if (err < 0) - CERROR("could not register mount in /proc/lustre"); + list_for_each(tmp, &inode->i_dentry) { + dentry = list_entry(tmp, struct dentry, d_alias); + CERROR("Alias[%d] dentry %p dump: name=%.*s parent=%.*s (%p), inode=%p, count=%u, " + "flags=0x%x, fsdata=%p\n", dentry_count, dentry, + dentry->d_name.len, dentry->d_name.name, + dentry->d_parent->d_name.len, dentry->d_parent->d_name.name, + dentry->d_parent, dentry->d_inode, atomic_read(&dentry->d_count), + dentry->d_flags, dentry->d_fsdata); + + dentry_count++; } - /*connect mds */ - err = lustre_connect_mds(sb, lmv, data, mds_security, async, pag); - if (err) - GOTO(out, err); +} - /*connect OST*/ - err = lustre_connect_ost(sb, lov, data, oss_security, async, pag); - if (err) - GOTO(out_lmv, err); +void lustre_dump_dentry(struct dentry *dentry, int recur) +{ + struct list_head *tmp; + int subdirs = 0; - /*connect GSS*/ - err = lustre_init_crypto(sb, gkc, data, async); - if (err) { - CERROR("Could not connect to GSS err %d\n", err); - err = 0; + LASSERT(dentry != NULL); + + list_for_each(tmp, &dentry->d_subdirs) + subdirs++; + + CERROR("dentry %p dump: name=%.*s parent=%.*s (%p), inode=%p, count=%u," + " flags=0x%x, fsdata=%p, %d subdirs\n", dentry, + dentry->d_name.len, dentry->d_name.name, + dentry->d_parent->d_name.len, dentry->d_parent->d_name.name, + dentry->d_parent, dentry->d_inode, atomic_read(&dentry->d_count), + dentry->d_flags, dentry->d_fsdata, subdirs); + if (dentry->d_inode != NULL) + ll_dump_inode(dentry->d_inode); + + if (recur == 0) + return; + + list_for_each(tmp, &dentry->d_subdirs) { + struct dentry *d = list_entry(tmp, struct dentry, d_child); + lustre_dump_dentry(d, recur - 1); } - - err = lustre_init_root_inode(sb); - if (err) - GOTO(out_gks, err); +} - err = ll_close_thread_start(&sbi->ll_lcq); - if (err) { - CERROR("cannot start close thread: rc %d\n", err); - GOTO(out_root, err); +#ifdef HAVE_EXPORT___IGET +static void prune_dir_dentries(struct inode *inode) +{ + struct dentry *dentry, *prev = NULL; + + /* due to lustre specific logic, a directory + * can have few dentries - a bug from VFS POV */ +restart: + spin_lock(&dcache_lock); + if (!list_empty(&inode->i_dentry)) { + dentry = list_entry(inode->i_dentry.prev, + struct dentry, d_alias); + /* in order to prevent infinite loops we + * break if previous dentry is busy */ + if (dentry != prev) { + prev = dentry; + dget_locked(dentry); + spin_unlock(&dcache_lock); + + /* try to kill all child dentries */ + lock_dentry(dentry); + shrink_dcache_parent(dentry); + unlock_dentry(dentry); + dput(dentry); + + /* now try to get rid of current dentry */ + d_prune_aliases(inode); + goto restart; + } } + spin_unlock(&dcache_lock); +} - ll_gns_add_timer(sbi); +static void prune_deathrow_one(struct ll_inode_info *lli) +{ + struct inode *inode = ll_info2i(lli); + + /* first, try to drop any dentries - they hold a ref on the inode */ + if (S_ISDIR(inode->i_mode)) + prune_dir_dentries(inode); + else + d_prune_aliases(inode); + + + /* if somebody still uses it, leave it */ + LASSERT(atomic_read(&inode->i_count) > 0); + if (atomic_read(&inode->i_count) > 1) + goto out; + + CDEBUG(D_INODE, "inode %lu/%u(%d) looks a good candidate for prune\n", + inode->i_ino,inode->i_generation, atomic_read(&inode->i_count)); + + /* seems nobody uses it anymore */ + inode->i_nlink = 0; - /* making vm readahead 0 for 2.4.x. In the case of 2.6.x, - backing dev info assigned to inode mapping is used for - determining maximal readahead. */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)) && \ - !defined(KERNEL_HAS_AS_MAX_READAHEAD) - /* bug 2805 - set VM readahead to zero */ - vm_max_readahead = vm_min_readahead = 0; -#endif - sb->s_flags |= MS_POSIXACL; -#ifdef S_PDIROPS - CWARN("Enabling PDIROPS\n"); - sb->s_flags |= S_PDIROPS; -#endif - if (data != NULL) - OBD_FREE(data, sizeof(*data)); - RETURN(err); -out_root: - if (sb->s_root) - dput(sb->s_root); -out_gks: - lustre_destroy_crypto(sb); -out_lmv: - obd_disconnect(sbi->ll_md_exp, 0); out: - if (data != NULL) - OBD_FREE(data, sizeof(*data)); - lprocfs_unregister_mountpoint(sbi); - RETURN(err); + iput(inode); + return; +} + +static void prune_deathrow(struct ll_sb_info *sbi, int try) +{ + struct ll_inode_info *lli; + int empty; + + do { + if (need_resched() && try) + break; + + if (try) { + if (!spin_trylock(&sbi->ll_deathrow_lock)) + break; + } else { + spin_lock(&sbi->ll_deathrow_lock); + } + + empty = 1; + lli = NULL; + if (!list_empty(&sbi->ll_deathrow)) { + lli = list_entry(sbi->ll_deathrow.next, + struct ll_inode_info, + lli_dead_list); + list_del_init(&lli->lli_dead_list); + if (!list_empty(&sbi->ll_deathrow)) + empty = 0; + } + spin_unlock(&sbi->ll_deathrow_lock); + + if (lli) + prune_deathrow_one(lli); + + } while (empty == 0); } +#else /* !HAVE_EXPORT___IGET */ +#define prune_deathrow(sbi, try) do {} while (0) +#endif /* HAVE_EXPORT___IGET */ -void lustre_common_put_super(struct super_block *sb) +void client_common_put_super(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); - struct hlist_node *tmp, *next; ENTRY; - ll_gns_del_timer(sbi); - ll_close_thread_stop(sbi->ll_lcq); +#ifdef CONFIG_FS_POSIX_ACL + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { + et_fini(&sbi->ll_et); + rct_fini(&sbi->ll_rct); + } +#endif + + obd_cancel_unused(sbi->ll_dt_exp, NULL, 0, NULL); + + ll_close_thread_shutdown(sbi->ll_lcq); + + cl_sb_fini(sb); + + /* destroy inodes in deathrow */ + prune_deathrow(sbi, 0); list_del(&sbi->ll_conn_chain); - obd_disconnect(sbi->ll_dt_exp, 0); - - lustre_destroy_crypto(sb); + + obd_fid_fini(sbi->ll_dt_exp); + obd_disconnect(sbi->ll_dt_exp); + sbi->ll_dt_exp = NULL; lprocfs_unregister_mountpoint(sbi); - if (sbi->ll_proc_root) { - lprocfs_remove(sbi->ll_proc_root); - sbi->ll_proc_root = NULL; - } - obd_disconnect(sbi->ll_md_exp, 0); + obd_fid_fini(sbi->ll_md_exp); + obd_disconnect(sbi->ll_md_exp); + sbi->ll_md_exp = NULL; - // We do this to get rid of orphaned dentries. That is not really trw. - hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { - struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash); - CWARN("orphan dentry %.*s (%p->%p) at unmount\n", - dentry->d_name.len, dentry->d_name.name, dentry, next); - shrink_dcache_parent(dentry); - } + EXIT; +} + +void ll_kill_super(struct super_block *sb) +{ + struct ll_sb_info *sbi; + + ENTRY; + + /* not init sb ?*/ + if (!(sb->s_flags & MS_ACTIVE)) + return; + + sbi = ll_s2sbi(sb); + /* we need restore s_dev from changed for clustred NFS before put_super + * because new kernels have cached s_dev and change sb->s_dev in + * put_super not affected real removing devices */ + if (sbi) + sb->s_dev = sbi->ll_sdev_orig; EXIT; } @@ -514,602 +758,289 @@ char *ll_read_opt(const char *opt, char *data) RETURN(retval); } -int ll_set_opt(const char *opt, char *data, int fl) +static inline int ll_set_opt(const char *opt, char *data, int fl) { - ENTRY; - - CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if (strncmp(opt, data, strlen(opt))) - RETURN(0); + if (strncmp(opt, data, strlen(opt)) != 0) + return(0); else - RETURN(fl); + return(fl); } -void ll_options(char *options, char **lov, char **lmv, char **gkc, - char **mds_sec, char **oss_sec, int *async, int *flags) +/* non-client-specific mount options are parsed in lmd_parse */ +static int ll_options(char *options, int *flags) { - char *this_char; -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) - char *opt_ptr = options; -#endif + int tmp; + char *s1 = options, *s2; ENTRY; - if (!options) { - EXIT; - return; - } + if (!options) + RETURN(0); - *async = 0; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - for (this_char = strtok (options, ","); - this_char != NULL; - this_char = strtok (NULL, ",")) { -#else - while ((this_char = strsep (&opt_ptr, ",")) != NULL) { -#endif - CDEBUG(D_SUPER, "this_char %s\n", this_char); - if (!*lov && (*lov = ll_read_opt("osc", this_char))) - continue; - if (!*lmv && (*lmv = ll_read_opt("mdc", this_char))) - continue; - if (!*gkc && (*gkc = ll_read_opt("gkc", this_char))) - continue; - if (!strncmp(this_char, "lasync", strlen("lasync"))) { - *async = 1; - continue; + CDEBUG(D_CONFIG, "Parsing opts %s\n", options); + + while (*s1) { + CDEBUG(D_SUPER, "next opt=%s\n", s1); + tmp = ll_set_opt("nolock", s1, LL_SBI_NOLCK); + if (tmp) { + *flags |= tmp; + goto next; } - if (!*mds_sec && (*mds_sec = ll_read_opt("mds_sec", this_char))) - continue; - if (!*oss_sec && (*oss_sec = ll_read_opt("oss_sec", this_char))) - continue; - if (!(*flags & LL_SBI_NOLCK) && - ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, - LL_SBI_NOLCK))) - continue; - } - - EXIT; + tmp = ll_set_opt("flock", s1, LL_SBI_FLOCK); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("localflock", s1, LL_SBI_LOCALFLOCK); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("noflock", s1, LL_SBI_FLOCK|LL_SBI_LOCALFLOCK); + if (tmp) { + *flags &= ~tmp; + goto next; + } + tmp = ll_set_opt("user_xattr", s1, LL_SBI_USER_XATTR); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("nouser_xattr", s1, LL_SBI_USER_XATTR); + if (tmp) { + *flags &= ~tmp; + goto next; + } + tmp = ll_set_opt("acl", s1, LL_SBI_ACL); + if (tmp) { + /* Ignore deprecated mount option. The client will + * always try to mount with ACL support, whether this + * is used depends on whether server supports it. */ + goto next; + } + tmp = ll_set_opt("noacl", s1, LL_SBI_ACL); + if (tmp) { + goto next; + } + tmp = ll_set_opt("remote_client", s1, LL_SBI_RMT_CLIENT); + if (tmp) { + *flags |= tmp; + goto next; + } + + tmp = ll_set_opt("checksum", s1, LL_SBI_CHECKSUM); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("nochecksum", s1, LL_SBI_CHECKSUM); + if (tmp) { + *flags &= ~tmp; + goto next; + } + tmp = ll_set_opt("lruresize", s1, LL_SBI_LRU_RESIZE); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("nolruresize", s1, LL_SBI_LRU_RESIZE); + if (tmp) { + *flags &= ~tmp; + goto next; + } + tmp = ll_set_opt("lazystatfs", s1, LL_SBI_LAZYSTATFS); + if (tmp) { + *flags |= tmp; + goto next; + } + tmp = ll_set_opt("nolazystatfs", s1, LL_SBI_LAZYSTATFS); + if (tmp) { + *flags &= ~tmp; + goto next; + } + + LCONSOLE_ERROR_MSG(0x152, "Unknown option '%s', won't mount.\n", + s1); + RETURN(-EINVAL); + +next: + /* Find next opt */ + s2 = strchr(s1, ','); + if (s2 == NULL) + break; + s1 = s2 + 1; + } + RETURN(0); } void ll_lli_init(struct ll_inode_info *lli) { - sema_init(&lli->lli_open_sem, 1); + lli->lli_inode_magic = LLI_INODE_MAGIC; sema_init(&lli->lli_size_sem, 1); + sema_init(&lli->lli_write_sem, 1); lli->lli_flags = 0; - lli->lli_size_pid = 0; lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; spin_lock_init(&lli->lli_lock); - INIT_LIST_HEAD(&lli->lli_pending_write_llaps); - INIT_LIST_HEAD(&lli->lli_close_item); + INIT_LIST_HEAD(&lli->lli_close_list); lli->lli_inode_magic = LLI_INODE_MAGIC; - memset(&lli->lli_id, 0, sizeof(lli->lli_id)); sema_init(&lli->lli_och_sem, 1); lli->lli_mds_read_och = lli->lli_mds_write_och = NULL; lli->lli_mds_exec_och = NULL; lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0; lli->lli_open_fd_exec_count = 0; - lli->lli_audit_mask = AUDIT_OFF; - lli->lli_key_info = NULL; - init_waitqueue_head(&lli->lli_dirty_wait); - lli->lli_io_epoch = 0; - INIT_LIST_HEAD(&lli->lli_capas); + INIT_LIST_HEAD(&lli->lli_dead_list); + lli->lli_remote_perms = NULL; + lli->lli_rmtperm_utime = 0; + sema_init(&lli->lli_rmtperm_sem, 1); + INIT_LIST_HEAD(&lli->lli_oss_capas); } -int ll_fill_super(struct super_block *sb, void *data, int silent) +int ll_fill_super(struct super_block *sb) { + struct lustre_profile *lprof; + struct lustre_sb_info *lsi = s2lsi(sb); struct ll_sb_info *sbi; - char *lov = NULL, *lmv = NULL, *gkc = NULL; - char *mds_sec = NULL; - char *oss_sec = NULL; - int async, err; - __u32 nllu[2] = { NOBODY_UID, NOBODY_GID }; - __u64 remote_flag = 0; + char *dt = NULL, *md = NULL; + char *profilenm = get_profile_name(sb); + struct config_llog_instance cfg = {0, }; + char ll_instance[sizeof(sb) * 2 + 1]; + int err; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - sbi = lustre_init_sbi(sb); - if (!sbi) - RETURN(-ENOMEM); + cfs_module_get(); - sbi->ll_flags |= LL_SBI_READAHEAD; - ll_options(data, &lov, &lmv, &gkc, &mds_sec, &oss_sec, - &async, &sbi->ll_flags); - - if (!lov || !lmv) { - CERROR("no osc %p or no mdc %p\n", lov, lmv); - GOTO(out, err = -EINVAL); + /* client additional sb info */ + lsi->lsi_llsbi = sbi = ll_init_sbi(); + if (!sbi) { + cfs_module_put(); + RETURN(-ENOMEM); } - - err = lustre_common_fill_super(sb, lmv, lov, gkc, async, mds_sec, - oss_sec, nllu, 0, &remote_flag); - EXIT; -out: - if (err) - lustre_free_sbi(sb); - - if (lmv) - OBD_FREE(lmv, strlen(lmv) + 1); - if (lov) - OBD_FREE(lov, strlen(lov) + 1); - if (mds_sec) - OBD_FREE(mds_sec, strlen(mds_sec) + 1); - if (oss_sec) - OBD_FREE(oss_sec, strlen(oss_sec) + 1); - if (gkc) - OBD_FREE(gkc, strlen(gkc) + 1); - - return err; -} /* ll_read_super */ - -static int lustre_process_log(struct lustre_mount_data *lmd, char *profile, - struct config_llog_instance *cfg, int allow_recov) -{ - struct lustre_cfg *lcfg = NULL; - struct lustre_cfg_bufs bufs; - struct portals_cfg pcfg; - char *peer = "MDS_PEER_UUID"; - struct obd_device *obd; - struct lustre_handle md_conn = {0, }; - struct obd_export *exp; - char *name = "mdc_dev"; - class_uuid_t uuid; - struct obd_uuid lmv_uuid; - struct llog_ctxt *ctxt; - int rc = 0, err = 0; - ENTRY; - if (lmd_bad_magic(lmd)) - RETURN(-EINVAL); - - generate_random_uuid(uuid); - class_uuid_unparse(uuid, &lmv_uuid); + err = ll_options(lsi->lsi_lmd->lmd_opts, &sbi->ll_flags); + if (err) + GOTO(out_free, err); - if (lmd->lmd_local_nid) { - PCFG_INIT(pcfg, NAL_CMD_REGISTER_MYNID); - pcfg.pcfg_nal = lmd->lmd_nal; - pcfg.pcfg_nid = lmd->lmd_local_nid; - rc = libcfs_nal_cmd(&pcfg); - if (rc < 0) - GOTO(out, rc); - } + /* Generate a string unique to this super, in case some joker tries + to mount the same fs at two mount points. + Use the address of the super itself.*/ + sprintf(ll_instance, "%p", sb); + cfg.cfg_instance = ll_instance; + cfg.cfg_uuid = lsi->lsi_llsbi->ll_sb_uuid; - if (lmd->lmd_nal == SOCKNAL || - lmd->lmd_nal == OPENIBNAL || - lmd->lmd_nal == IIBNAL || - lmd->lmd_nal == VIBNAL || - lmd->lmd_nal == RANAL) { - PCFG_INIT(pcfg, NAL_CMD_ADD_PEER); - pcfg.pcfg_nal = lmd->lmd_nal; - pcfg.pcfg_nid = lmd->lmd_server_nid; - pcfg.pcfg_id = lmd->lmd_server_ipaddr; - pcfg.pcfg_misc = lmd->lmd_port; - rc = libcfs_nal_cmd(&pcfg); - if (rc < 0) - GOTO(out, rc); + /* set up client obds */ + err = lustre_process_log(sb, profilenm, &cfg); + if (err < 0) { + CERROR("Unable to process log: %d\n", err); + GOTO(out_free, err); } - lustre_cfg_bufs_reset(&bufs, name); - lustre_cfg_bufs_set_string(&bufs, 1, peer); - - lcfg = lustre_cfg_new(LCFG_ADD_UUID, &bufs); - lcfg->lcfg_nal = lmd->lmd_nal; - lcfg->lcfg_nid = lmd->lmd_server_nid; - LASSERT(lcfg->lcfg_nal); - LASSERT(lcfg->lcfg_nid); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err < 0) - GOTO(out_del_conn, err); - - lustre_cfg_bufs_reset(&bufs, name); - lustre_cfg_bufs_set_string(&bufs, 1, OBD_MDC_DEVICENAME); - lustre_cfg_bufs_set_string(&bufs, 2, (char *)lmv_uuid.uuid); - - lcfg = lustre_cfg_new(LCFG_ATTACH, &bufs); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err < 0) - GOTO(out_del_uuid, err); - - lustre_cfg_bufs_reset(&bufs, name); - lustre_cfg_bufs_set_string(&bufs, 1, lmd->lmd_mds); - lustre_cfg_bufs_set_string(&bufs, 2, peer); - - lcfg = lustre_cfg_new(LCFG_SETUP, &bufs); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err < 0) - GOTO(out_detach, err); - - obd = class_name2obd(name); - if (obd == NULL) - GOTO(out_cleanup, rc = -EINVAL); - - rc = obd_set_info(obd->obd_self_export, strlen("sec"), "sec", - strlen(lmd->lmd_mds_security), lmd->lmd_mds_security); - if (rc) - GOTO(out_cleanup, rc); - if (lmd->lmd_pag) { - unsigned long sec_flags = PTLRPC_SEC_FL_PAG; - rc = obd_set_info(obd->obd_self_export, - strlen("sec_flags"), "sec_flags", - sizeof(sec_flags), &sec_flags); - if (rc) - GOTO(out_cleanup, rc); + lprof = class_get_profile(profilenm); + if (lprof == NULL) { + LCONSOLE_ERROR_MSG(0x156, "The client profile '%s' could not be" + " read from the MGS. Does that filesystem " + "exist?\n", profilenm); + GOTO(out_free, err = -EINVAL); } + CDEBUG(D_CONFIG, "Found profile %s: mdc=%s osc=%s\n", profilenm, + lprof->lp_md, lprof->lp_dt); - /* Disable initial recovery on this import */ - rc = obd_set_info(obd->obd_self_export, - strlen("initial_recov"), "initial_recov", - sizeof(allow_recov), &allow_recov); - if (rc) - GOTO(out_cleanup, rc); + OBD_ALLOC(dt, strlen(lprof->lp_dt) + + strlen(ll_instance) + 2); + if (!dt) + GOTO(out_free, err = -ENOMEM); + sprintf(dt, "%s-%s", lprof->lp_dt, ll_instance); - rc = obd_connect(&md_conn, obd, &lmv_uuid, NULL, OBD_OPT_REAL_CLIENT); - if (rc) { - CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, rc); - GOTO(out_cleanup, rc); - } + OBD_ALLOC(md, strlen(lprof->lp_md) + + strlen(ll_instance) + 2); + if (!md) + GOTO(out_free, err = -ENOMEM); + sprintf(md, "%s-%s", lprof->lp_md, ll_instance); - exp = class_conn2export(&md_conn); + /* connections, registrations, sb setup */ + err = client_common_fill_super(sb, md, dt); - ctxt = llog_get_context(&exp->exp_obd->obd_llogs,LLOG_CONFIG_REPL_CTXT); - rc = class_config_process_llog(ctxt, profile, cfg); - if (rc) - CERROR("class_config_process_llog failed: rc = %d\n", rc); +out_free: + if (md) + OBD_FREE(md, strlen(md) + 1); + if (dt) + OBD_FREE(dt, strlen(dt) + 1); + if (err) + ll_put_super(sb); + else + LCONSOLE_WARN("Client %s has started\n", profilenm); - err = obd_disconnect(exp, 0); - - EXIT; -out_cleanup: - lustre_cfg_bufs_reset(&bufs, name); - lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err < 0) - GOTO(out, err); -out_detach: - lustre_cfg_bufs_reset(&bufs, name); - lcfg = lustre_cfg_new(LCFG_DETACH, &bufs); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err < 0) - GOTO(out, err); + RETURN(err); +} /* ll_fill_super */ -out_del_uuid: - lustre_cfg_bufs_reset(&bufs, name); - lustre_cfg_bufs_set_string(&bufs, 1, peer); - lcfg = lustre_cfg_new(LCFG_DEL_UUID, &bufs); - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - -out_del_conn: - if (lmd->lmd_nal == SOCKNAL || - lmd->lmd_nal == OPENIBNAL || - lmd->lmd_nal == IIBNAL || - lmd->lmd_nal == VIBNAL || - lmd->lmd_nal == RANAL) { - int err2; - - PCFG_INIT(pcfg, NAL_CMD_DEL_PEER); - pcfg.pcfg_nal = lmd->lmd_nal; - pcfg.pcfg_nid = lmd->lmd_server_nid; - pcfg.pcfg_flags = 1; /* single_share */ - err2 = libcfs_nal_cmd(&pcfg); - if (err2 && !err) - err = err2; - if (err < 0) - GOTO(out, err); - } -out: - if (rc == 0) - rc = err; - return rc; -} +void lu_context_keys_dump(void); -static void lustre_manual_cleanup(struct ll_sb_info *sbi) +void ll_put_super(struct super_block *sb) { - struct lustre_cfg *lcfg; - struct lustre_cfg_bufs bufs; + struct config_llog_instance cfg; + char ll_instance[sizeof(sb) * 2 + 1]; struct obd_device *obd; - int next = 0; - - while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL) { - int err; - - lustre_cfg_bufs_reset(&bufs, obd->obd_name); - lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs); - err = class_process_config(lcfg); - if (err) { - CERROR("cleanup failed: %s\n", obd->obd_name); - //continue; - } - - lcfg->lcfg_command = LCFG_DETACH; - err = class_process_config(lcfg); - lustre_cfg_free(lcfg); - if (err) { - CERROR("detach failed: %s\n", obd->obd_name); - //continue; - } - } - - if (sbi->ll_lmd != NULL) - class_del_profile(sbi->ll_lmd->lmd_profile); -} - -static int lustre_process_profile(struct super_block *sb, - struct lustre_mount_data *lmd, - char **lov, char **lmv, char **gkc) -{ + struct lustre_sb_info *lsi = s2lsi(sb); struct ll_sb_info *sbi = ll_s2sbi(sb); - struct config_llog_instance cfg; - struct lustre_profile *lprof; - int len, err = 0; + char *profilenm = get_profile_name(sb); + int force = 1, next; ENTRY; - if (!lmd->lmd_profile) - RETURN(0); - - if (lmd->lmd_mds[0] == '\0') { - CERROR("no mds name\n"); - GOTO(out, err = -EINVAL); - } - lmd->lmd_mds_security[sizeof(lmd->lmd_mds_security) - 1] = 0; - lmd->lmd_oss_security[sizeof(lmd->lmd_oss_security) - 1] = 0; - - OBD_ALLOC(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); - if (sbi->ll_lmd == NULL) - GOTO(out, err = -ENOMEM); - memcpy(sbi->ll_lmd, lmd, sizeof(*lmd)); - - /* generate a string unique to this super, let's try the address of the - * super itself. */ - len = (sizeof(sb) * 2) + 1; - OBD_ALLOC(sbi->ll_instance, len); - if (sbi->ll_instance == NULL) - GOTO(out, err = -ENOMEM); - sprintf(sbi->ll_instance, "%p", sb); - - cfg.cfg_instance = sbi->ll_instance; - cfg.cfg_uuid = sbi->ll_sb_uuid; - cfg.cfg_local_nid = lmd->lmd_local_nid; - cfg.cfg_flags |= CFG_MODIFY_UUID_FL; - err = lustre_process_log(lmd, lmd->lmd_profile, &cfg, 0); - if (err < 0) { - CERROR("Unable to process log: %s\n", lmd->lmd_profile); - GOTO(out, err); - } - - lprof = class_get_profile(lmd->lmd_profile); - if (lprof == NULL) { - CERROR("No profile found: %s\n", lmd->lmd_profile); - GOTO(out, err = -EINVAL); - } - - OBD_ALLOC(*lov, strlen(lprof->lp_lov) + - strlen(sbi->ll_instance) + 2); - if (*lov == NULL) - GOTO(out, err = -ENOMEM); - - sprintf(*lov, "%s-%s", lprof->lp_lov, sbi->ll_instance); - - OBD_ALLOC(*lmv, strlen(lprof->lp_lmv) + - strlen(sbi->ll_instance) + 2); - if (*lmv == NULL) - GOTO(out_free_lov, err = -ENOMEM); + CDEBUG(D_VFSTRACE, "VFS Op: sb %p - %s\n", sb, profilenm); - sprintf(*lmv, "%s-%s", lprof->lp_lmv, sbi->ll_instance); + ll_print_capa_stat(sbi); - if (lprof->lp_gkc) { - OBD_ALLOC(*gkc, strlen(lprof->lp_gkc) + - strlen(sbi->ll_instance) + 2); - if (*gkc == NULL) - GOTO(out_free_lmv, err = -ENOMEM); - - sprintf(*gkc, "%s-%s", lprof->lp_gkc, sbi->ll_instance); - } - - RETURN(err); -out_free_lmv: - OBD_FREE(*lmv, strlen(lprof->lp_lmv) + - strlen(sbi->ll_instance) + 2); -out_free_lov: - OBD_FREE(*lov, strlen(lprof->lp_lov) + - strlen(sbi->ll_instance) + 2); -out: - return err; -} + sprintf(ll_instance, "%p", sb); + cfg.cfg_instance = ll_instance; + lustre_end_log(sb, NULL, &cfg); -static int lustre_clean_profile(struct ll_sb_info *sbi, int force_umount) -{ - struct lustre_mount_data *lmd = sbi->ll_lmd; - struct config_llog_instance cfg; - char *cl_prof; - int len, err = 0; - ENTRY; - - if (!lmd) - RETURN(err); - - len = strlen(sbi->ll_lmd->lmd_profile) + sizeof("-clean") + 1; - - if (force_umount) { - CERROR("force umount, doing manual cleanup\n"); - lustre_manual_cleanup(sbi); - GOTO(free_lmd, 0); - - } - if (sbi->ll_instance != NULL) { - cfg.cfg_flags |= CFG_MODIFY_UUID_FL; - cfg.cfg_instance = sbi->ll_instance; - cfg.cfg_uuid = sbi->ll_sb_uuid; - - OBD_ALLOC(cl_prof, len); - if (!cl_prof) { - CERROR("can't allocate memory, " - "skipping processing cleanup profile.\n"); - GOTO(free_lmd, err = -ENOMEM); - } - - sprintf(cl_prof, "%s-clean", lmd->lmd_profile); - err = lustre_process_log(lmd, cl_prof, &cfg, 0); - if (err < 0) { - CERROR("Unable to process log: %s\n", cl_prof); - lustre_manual_cleanup(sbi); + if (sbi->ll_md_exp) { + obd = class_exp2obd(sbi->ll_md_exp); + if (obd) + force = obd->obd_force; + } + + /* We need to set force before the lov_disconnect in + lustre_common_put_super, since l_d cleans up osc's as well. */ + if (force) { + next = 0; + while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, + &next)) != NULL) { + obd->obd_force = force; } - OBD_FREE(cl_prof, len); } - EXIT; -free_lmd: - if (sbi->ll_instance) - OBD_FREE(sbi->ll_instance, strlen(sbi->ll_instance) + 1); - OBD_FREE(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); - return err; -} - -int lustre_fill_super(struct super_block *sb, void *data, int silent) -{ - struct lustre_mount_data * lmd = data; - char *lov = NULL, *lmv = NULL, *gkc = NULL; - struct ll_sb_info *sbi; - int err; - ENTRY; - - CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - if (lmd_bad_magic(lmd)) - RETURN(-EINVAL); - - sbi = lustre_init_sbi(sb); - if (!sbi) - RETURN(-ENOMEM); - - sbi->ll_flags |= LL_SBI_READAHEAD; - err = lustre_process_profile(sb, lmd, &lov, &lmv, &gkc); - if (err) { - CERROR("Can not process the profile err %d \n", err); - GOTO(out_free, err); - } - if (!lov || !lmv) { - CERROR("no osc %p or no mdc %p \n", lov, lmv); - GOTO(out_free, err = -EINVAL); + if (sbi->ll_lcq) { + /* Only if client_common_fill_super succeeded */ + client_common_put_super(sb); } - err = lustre_common_fill_super(sb, lmv, lov, gkc, lmd->lmd_async, - lmd->lmd_mds_security, - lmd->lmd_oss_security, - &lmd->lmd_nllu, lmd->lmd_pag, - &lmd->lmd_remote_flag); - - if (err) - GOTO(out_free, err); + next = 0; + while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) !=NULL) { + class_manual_cleanup(obd); + } - EXIT; -out_dev: - if (lmv) - OBD_FREE(lmv, strlen(lmv) + 1); - if (lov) - OBD_FREE(lov, strlen(lov) + 1); - if (gkc) - OBD_FREE(gkc, strlen(gkc) + 1); - - return err; -out_free: - lustre_clean_profile(sbi, 0); - lustre_free_sbi(sb); - goto out_dev; + if (profilenm) + class_del_profile(profilenm); -} /* lustre_fill_super */ - -void lustre_put_super(struct super_block *sb) -{ - struct obd_device *obd; - struct ll_sb_info *sbi = ll_s2sbi(sb); - int force_umount = 0; - ENTRY; - - CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - obd = class_exp2obd(sbi->ll_md_exp); - if (obd) - force_umount = obd->obd_no_recov; - obd = NULL; + ll_free_sbi(sb); + lsi->lsi_llsbi = NULL; lustre_common_put_super(sb); - lustre_clean_profile(sbi, force_umount); - lustre_free_sbi(sb); - EXIT; -} /* lustre_put_super */ + cl_env_cache_purge(~0); -int ll_process_config_update(struct ll_sb_info *sbi, int clean) -{ - struct lustre_mount_data *lmd = sbi->ll_lmd; - char *profile = lmd->lmd_profile, *name = NULL; - struct config_llog_instance cfg; - int rc, namelen = 0, version; - struct llog_ctxt *ctxt; - ENTRY; - - if (profile == NULL) - RETURN(0); - if (lmd == NULL) { - CERROR("Client not mounted with zero-conf; cannot " - "process update log.\n"); - RETURN(0); - } + LCONSOLE_WARN("client %s umount complete\n", ll_instance); - cfg.cfg_instance = sbi->ll_instance; - cfg.cfg_uuid = sbi->ll_sb_uuid; - cfg.cfg_local_nid = lmd->lmd_local_nid; - cfg.cfg_flags |= CFG_MODIFY_UUID_FL; - namelen = strlen(profile) + 20; /* -clean-######### */ - OBD_ALLOC(name, namelen); - if (name == NULL) - RETURN(-ENOMEM); + cfs_module_put(); - if (clean) { - version = sbi->ll_config_version - 1; - sprintf(name, "%s-clean-%d", profile, version); - } else { - version = sbi->ll_config_version + 1; - sprintf(name, "%s-%d", profile, version); - } - - CWARN("Applying configuration log %s\n", name); - - ctxt = llog_get_context(&sbi->ll_md_exp->exp_obd->obd_llogs, - LLOG_CONFIG_REPL_CTXT); - rc = class_config_process_llog(ctxt, name, &cfg); - if (rc == 0) - sbi->ll_config_version = version; - CWARN("Finished applying configuration log %s: %d\n", name, rc); - - if (rc == 0 && clean == 0) { - struct lov_desc desc; - __u32 valsize; - int rc = 0; - - valsize = sizeof(desc); - rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, - "lovdesc", &valsize, &desc); - - rc = obd_init_ea_size(sbi->ll_md_exp, - obd_size_diskmd(sbi->ll_dt_exp, NULL), - (desc.ld_tgt_count * - sizeof(struct llog_cookie))); - } - OBD_FREE(name, namelen); - RETURN(rc); -} + EXIT; +} /* client_put_super */ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) { struct inode *inode = NULL; - /* NOTE: we depend on atomic igrab() -bzzz */ lock_res_and_lock(lock); if (lock->l_ast_data) { @@ -1117,84 +1048,61 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) if (lli->lli_inode_magic == LLI_INODE_MAGIC) { inode = igrab(lock->l_ast_data); } else { - struct timeval now; - do_gettimeofday(&now); inode = lock->l_ast_data; - LDLM_ERROR(lock, "granted at %lu.%lu, now %lu.%lu", - lock->l_enqueued_time.tv_sec, - lock->l_enqueued_time.tv_usec, - now.tv_sec, now.tv_usec); - CDEBUG(inode->i_state & I_FREEING ? D_INFO : D_WARNING, - "l_ast_data %p is bogus: magic %0x8\n", - lock->l_ast_data, lli->lli_inode_magic); - CDEBUG(D_ERROR, "i_state = 0x%lx, l_ast_data %p is bogus: magic %0x8\n", - inode->i_state, lock->l_ast_data, lli->lli_inode_magic); + ldlm_lock_debug(NULL, inode->i_state & I_FREEING ? + D_INFO : D_WARNING, + lock, __FILE__, __func__, __LINE__, + "l_ast_data %p is bogus: magic %08x", + lock->l_ast_data, lli->lli_inode_magic); inode = NULL; - unlock_res_and_lock(lock); - LBUG(); } } unlock_res_and_lock(lock); return inode; } -int null_if_equal(struct ldlm_lock *lock, void *data) +static int null_if_equal(struct ldlm_lock *lock, void *data) { if (data == lock->l_ast_data) { lock->l_ast_data = NULL; if (lock->l_req_mode != lock->l_granted_mode) - LDLM_ERROR(lock,"clearing inode with ungranted lock\n"); + LDLM_ERROR(lock,"clearing inode with ungranted lock"); } return LDLM_ITER_CONTINUE; } -static void remote_acl_free(struct remote_acl *racl); - void ll_clear_inode(struct inode *inode) { - struct lustre_id id; struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); - struct obd_capa *ocapa, *tmp; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - LASSERT(ll_is_inode_dirty(inode) == 0); - ll_inode2id(&id, inode); - - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); - md_change_cbdata(sbi->ll_md_exp, &id, null_if_equal, inode); + if (S_ISDIR(inode->i_mode)) { + /* these should have been cleared in ll_file_release */ + LASSERT(lli->lli_sai == NULL); + LASSERT(lli->lli_opendir_key == NULL); + LASSERT(lli->lli_opendir_pid == 0); + } + + ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK; + md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode), + null_if_equal, inode); LASSERT(!lli->lli_open_fd_write_count); LASSERT(!lli->lli_open_fd_read_count); LASSERT(!lli->lli_open_fd_exec_count); + if (lli->lli_mds_write_och) - ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE); + ll_md_real_close(inode, FMODE_WRITE); if (lli->lli_mds_exec_och) - ll_md_real_close(sbi->ll_md_exp, inode, FMODE_EXEC); + ll_md_real_close(inode, FMODE_EXEC); if (lli->lli_mds_read_och) - ll_md_real_close(sbi->ll_md_exp, inode, FMODE_READ); - if (lli->lli_smd) - obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd, - null_if_equal, inode); - - if (lli->lli_smd) { - obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd); - lli->lli_smd = NULL; - } - - if (lli->lli_mea) { - obd_free_memmd(sbi->ll_md_exp, - (struct lov_stripe_md **) &lli->lli_mea); - lli->lli_mea = NULL; - } - - LASSERT(sbi->ll_crypto_info != NULL); - ll_crypto_destroy_inode_key(inode); + ll_md_real_close(inode, FMODE_READ); if (lli->lli_symlink_name) { OBD_FREE(lli->lli_symlink_name, @@ -1202,27 +1110,150 @@ void ll_clear_inode(struct inode *inode) lli->lli_symlink_name = NULL; } - if (lli->lli_posix_acl) { - LASSERT(lli->lli_remote_acl == NULL); + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { + LASSERT(lli->lli_posix_acl == NULL); + if (lli->lli_remote_perms) { + free_rmtperm_hash(lli->lli_remote_perms); + lli->lli_remote_perms = NULL; + } + } +#ifdef CONFIG_FS_POSIX_ACL + else if (lli->lli_posix_acl) { + LASSERT(atomic_read(&lli->lli_posix_acl->a_refcount) == 1); + LASSERT(lli->lli_remote_perms == NULL); posix_acl_release(lli->lli_posix_acl); lli->lli_posix_acl = NULL; } +#endif + lli->lli_inode_magic = LLI_INODE_DEAD; - if (lli->lli_remote_acl) { - LASSERT(lli->lli_posix_acl == NULL); - remote_acl_free(lli->lli_remote_acl); - lli->lli_remote_acl = NULL; +#ifdef HAVE_EXPORT___IGET + spin_lock(&sbi->ll_deathrow_lock); + list_del_init(&lli->lli_dead_list); + spin_unlock(&sbi->ll_deathrow_lock); +#endif + ll_clear_inode_capas(inode); + /* + * XXX This has to be done before lsm is freed below, because + * cl_object still uses inode lsm. + */ + cl_inode_fini(inode); + + if (lli->lli_smd) { + obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd); + lli->lli_smd = NULL; } - list_for_each_entry_safe(ocapa, tmp, &lli->lli_capas, u.client.lli_list) - capa_put(ocapa); - LASSERT(!mapping_has_pages(inode->i_mapping)); - lli->lli_inode_magic = LLI_INODE_DEAD; - EXIT; } +int ll_md_setattr(struct inode *inode, struct md_op_data *op_data, + struct md_open_data **mod) +{ + struct lustre_md md; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *request = NULL; + int rc; + ENTRY; + + op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, + &request, mod); + if (rc) { + ptlrpc_req_finished(request); + if (rc == -ENOENT) { + inode->i_nlink = 0; + /* Unlinked special device node? Or just a race? + * Pretend we done everything. */ + if (!S_ISREG(inode->i_mode) && + !S_ISDIR(inode->i_mode)) + rc = inode_setattr(inode, &op_data->op_attr); + } else if (rc != -EPERM && rc != -EACCES && rc != -ETXTBSY) { + CERROR("md_setattr fails: rc = %d\n", rc); + } + RETURN(rc); + } + + rc = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp, + sbi->ll_md_exp, &md); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + + /* We call inode_setattr to adjust timestamps. + * If there is at least some data in file, we cleared ATTR_SIZE + * above to avoid invoking vmtruncate, otherwise it is important + * to call vmtruncate in inode_setattr to update inode->i_size + * (bug 6196) */ + rc = inode_setattr(inode, &op_data->op_attr); + + /* Extract epoch data if obtained. */ + op_data->op_handle = md.body->handle; + op_data->op_ioepoch = md.body->ioepoch; + + ll_update_inode(inode, &md); + ptlrpc_req_finished(request); + + RETURN(rc); +} + +/* Close IO epoch and send Size-on-MDS attribute update. */ +static int ll_setattr_done_writing(struct inode *inode, + struct md_op_data *op_data, + struct md_open_data *mod) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc = 0; + ENTRY; + + LASSERT(op_data != NULL); + if (!S_ISREG(inode->i_mode)) + RETURN(0); + + CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n", + op_data->op_ioepoch, PFID(&lli->lli_fid)); + + op_data->op_flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE; + rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, mod); + if (rc == -EAGAIN) { + /* MDS has instructed us to obtain Size-on-MDS attribute + * from OSTs and send setattr to back to MDS. */ + rc = ll_sizeonmds_update(inode, &op_data->op_handle, + op_data->op_ioepoch); + } else if (rc) { + CERROR("inode %lu mdc truncate failed: rc = %d\n", + inode->i_ino, rc); + } + RETURN(rc); +} + +static int ll_setattr_do_truncate(struct inode *inode, loff_t size) +{ + struct obd_capa *capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC); + int rc; + + rc = cl_setattr_do_truncate(inode, size, capa); + ll_truncate_free_capa(capa); + return rc; +} + +static int ll_setattr_ost(struct inode *inode) +{ + struct obd_capa *capa = ll_mdscapa_get(inode); + int rc; + + rc = cl_setattr_ost(inode, capa); + capa_put(capa); + + return rc; +} + /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -1238,17 +1269,17 @@ void ll_clear_inode(struct inode *inode) */ int ll_setattr_raw(struct inode *inode, struct iattr *attr) { - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct ll_inode_info *lli = ll_i2info(inode); - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ptlrpc_request *request = NULL; - struct mdc_op_data *op_data; + struct lov_stripe_md *lsm = lli->lli_smd; + struct md_op_data *op_data = NULL; + struct md_open_data *mod = NULL; int ia_valid = attr->ia_valid; - int err, rc = 0; + int rc = 0, rc1 = 0; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino); - lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETATTR); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu valid %x\n", inode->i_ino, + attr->ia_valid); + ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_SETATTR, 1); if (ia_valid & ATTR_SIZE) { if (attr->ia_size > ll_file_maxbytes(inode)) { @@ -1262,7 +1293,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* POSIX: check before ATTR_*TIME_SET set (from inode_change_ok) */ if (ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET)) { - if (current->fsuid != inode->i_uid && !capable(CAP_FOWNER)) + if (current->fsuid != inode->i_uid && + !cfs_capable(CFS_CAP_FOWNER)) RETURN(-EPERM); } @@ -1279,860 +1311,408 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) attr->ia_mtime = CURRENT_TIME; attr->ia_valid |= ATTR_MTIME_SET; } + if ((attr->ia_valid & ATTR_CTIME) && !(attr->ia_valid & ATTR_MTIME)) { + /* To avoid stale mtime on mds, obtain it from ost and send + to mds. */ + rc = cl_glimpse_size(inode); + if (rc) + RETURN(rc); + + attr->ia_valid |= ATTR_MTIME_SET | ATTR_MTIME; + attr->ia_mtime = inode->i_mtime; + } if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n", LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime), - LTIME_S(CURRENT_TIME)); - - /* If only OST attributes being set on objects, don't do MDS RPC. - * In that case, we need to check permissions and update the local - * inode ourselves so we can call obdo_from_inode() always. */ - if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN /*| ATTR_RAW*/) : ~0)) { - struct lustre_md md; - void *key = NULL; - int key_size = 0; - - OBD_ALLOC(op_data, sizeof(*op_data)); - if (op_data == NULL) - RETURN(-ENOMEM); - ll_inode2mdc_data(op_data, inode, (OBD_MD_FLID | OBD_MD_MEA)); - - if (ia_valid & (ATTR_UID | ATTR_GID | ATTR_MODE)) { - rc = ll_crypto_get_mac(inode, attr, NULL, 0, &key, - &key_size); - if (rc) { - CERROR("can not get right mac, rc=%d\n", rc); - if (key && key_size) - OBD_FREE(key, key_size); - RETURN(rc); - } - } - rc = md_setattr(sbi->ll_md_exp, op_data, - attr, key, key_size, NULL, 0, NULL, - 0, &request); - OBD_FREE(op_data, sizeof(*op_data)); - - if (key && key_size) - OBD_FREE(key, key_size); - if (rc) { - ptlrpc_req_finished(request); - if (rc != -EPERM && rc != -EACCES) - CERROR("md_setattr fails: rc = %d\n", rc); - RETURN(rc); - } - rc = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, - sbi->ll_dt_exp, &md); - if (rc) { - ptlrpc_req_finished(request); - RETURN(rc); - } - - if (attr->ia_valid & ATTR_SIZE) { - rc = ll_set_trunc_capa(request, 0, inode); - if (rc) { - ptlrpc_req_finished(request); - RETURN(rc); - } - } + cfs_time_current_sec()); - /* We call inode_setattr to adjust timestamps, but we first - * clear ATTR_SIZE to avoid invoking vmtruncate. - * - * NB: ATTR_SIZE will only be set at this point if the size - * resides on the MDS, ie, this file has no objects. */ + /* NB: ATTR_SIZE will only be set after this point if the size + * resides on the MDS, ie, this file has no objects. */ + if (lsm) attr->ia_valid &= ~ATTR_SIZE; - /* - * assigning inode_setattr() to @err to disable warning that - * function's result should be checked by by caller. error is - * impossible here, as vmtruncate() control path is disabled. - */ - err = inode_setattr(inode, attr); - ll_update_inode(inode, &md); - ptlrpc_req_finished(request); - - if (!lsm || !S_ISREG(inode->i_mode)) { - CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); - RETURN(0); - } - } else { - /* The OST doesn't check permissions, but the alternative is - * a gratuitous RPC to the MDS. We already rely on the client - * to do read/write/truncate permission checks, so is mtime OK? - */ - if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) { - /* from sys_utime() */ - if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { - if (current->fsuid != inode->i_uid && - (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0) - RETURN(rc); - } else { - /* from inode_change_ok() */ - if (current->fsuid != inode->i_uid && - !capable(CAP_FOWNER)) - RETURN(-EPERM); - } - } - - if (lsm) - attr->ia_valid &= ~ATTR_SIZE; - - /* won't invoke vmtruncate, as we already cleared ATTR_SIZE */ - err = inode_setattr(inode, attr); - /* - * assigning inode_setattr() to @err to disable warning that - * function's result should be checked by by caller. error is - * impossible here, as vmtruncate() control path is disabled. - */ - } - - /* We really need to get our PW lock before we change inode->i_size. - * If we don't we can race with other i_size updaters on our node, like - * ll_file_read. We can also race with i_size propogation to other - * nodes through dirtying and writeback of final cached pages. This - * last one is especially bad for racing o_append users on other - * nodes. */ - if (ia_valid & ATTR_SIZE) { - ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, - OBD_OBJECT_EOF } }; - struct lustre_handle lockh = { 0 }; - int err, ast_flags = 0; - /* XXX when we fix the AST intents to pass the discard-range - * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA - * XXX here. */ - if (attr->ia_size == 0) - ast_flags = LDLM_AST_DISCARD_DATA; - - rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, - ast_flags, &ll_i2sbi(inode)->ll_seek_stime); - - if (rc != 0) - RETURN(rc); - - down(&lli->lli_size_sem); - lli->lli_size_pid = current->pid; - rc = vmtruncate(inode, attr->ia_size); - if (rc != 0) { - LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); - lli->lli_size_pid = 0; - up(&lli->lli_size_sem); - } - - err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); - if (err) { - CERROR("ll_extent_unlock failed: %d\n", err); - if (!rc) - rc = err; - } - } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET | ATTR_UID | ATTR_GID)) { - struct obdo *oa = NULL; - - CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - inode->i_ino, LTIME_S(attr->ia_mtime)); - - oa = obdo_alloc(); - if (oa == NULL) - RETURN(-ENOMEM); - - oa->o_id = lsm->lsm_object_id; - oa->o_gr = lsm->lsm_object_gr; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; - - /* adding uid and gid, needed for quota */ - if (ia_valid & ATTR_UID) { - oa->o_uid = inode->i_uid; - oa->o_valid |= OBD_MD_FLUID; - } - - if (ia_valid & ATTR_GID) { - oa->o_gid = inode->i_gid; - oa->o_valid |= OBD_MD_FLGID; - } - - *(obdo_id(oa)) = lli->lli_id; - oa->o_valid |= OBD_MD_FLIFID; - - obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL, NULL); - obdo_free(oa); - if (rc) - CERROR("obd_setattr fails: rc = %d\n", rc); - } - - RETURN(rc); -} + /* We always do an MDS RPC, even if we're only changing the size; + * only the MDS knows whether truncate() should fail with -ETXTBUSY */ -int ll_setattr(struct dentry *de, struct iattr *attr) -{ - LASSERT(de->d_inode); - return ll_setattr_raw(de->d_inode, attr); -} - -int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, - unsigned long max_age) -{ - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_statfs obd_osfs; - int rc; - ENTRY; - - rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age); - if (rc) { - CERROR("obd_statfs fails: rc = %d\n", rc); - RETURN(rc); - } - - osfs->os_type = sb->s_magic; - - CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", - osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - - rc = obd_statfs(class_exp2obd(sbi->ll_dt_exp), &obd_osfs, max_age); - if (rc) { - CERROR("obd_statfs fails: rc = %d\n", rc); - RETURN(rc); - } - - CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", - obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree, - obd_osfs.os_files); - - osfs->os_blocks = obd_osfs.os_blocks; - osfs->os_bfree = obd_osfs.os_bfree; - osfs->os_bavail = obd_osfs.os_bavail; - - /* If we don't have as many objects free on the OST as inodes - * on the MDS, we reduce the total number of inodes to - * compensate, so that the "inodes in use" number is correct. - */ - if (obd_osfs.os_ffree < osfs->os_ffree) { - osfs->os_files = (osfs->os_files - osfs->os_ffree) + - obd_osfs.os_ffree; - osfs->os_ffree = obd_osfs.os_ffree; - } - - RETURN(rc); -} + OBD_ALLOC_PTR(op_data); + if (op_data == NULL) + RETURN(-ENOMEM); -int ll_statfs(struct super_block *sb, struct kstatfs *sfs) -{ - struct obd_statfs osfs; - int rc; + memcpy(&op_data->op_attr, attr, sizeof(*attr)); - CDEBUG(D_VFSTRACE, "VFS Op: superblock %p\n", sb); - lprocfs_counter_incr(ll_s2sbi(sb)->ll_stats, LPROC_LL_STAFS); + /* Open epoch for truncate. */ + if ((ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) && + (ia_valid & ATTR_SIZE)) + op_data->op_flags = MF_EPOCH_OPEN; - /* For now we will always get up-to-date statfs values, but in the - * future we may allow some amount of caching on the client (e.g. - * from QOS or lprocfs updates). */ - rc = ll_statfs_internal(sb, &osfs, jiffies - 1); + rc = ll_md_setattr(inode, op_data, &mod); if (rc) - return rc; - - statfs_unpack(sfs, &osfs); - - if (sizeof(sfs->f_blocks) == 4) { - while (osfs.os_blocks > ~0UL) { - sfs->f_bsize <<= 1; - - osfs.os_blocks >>= 1; - osfs.os_bfree >>= 1; - osfs.os_bavail >>= 1; - } - } - - sfs->f_blocks = osfs.os_blocks; - sfs->f_bfree = osfs.os_bfree; - sfs->f_bavail = osfs.os_bavail; - - return 0; -} - - -/******************************** - * remote acl * - ********************************/ - -static struct remote_acl *remote_acl_alloc(void) -{ - struct remote_acl *racl; - int i; - - OBD_ALLOC(racl, sizeof(*racl)); - if (!racl) - return NULL; - - spin_lock_init(&racl->ra_lock); - init_MUTEX(&racl->ra_update_sem); - - for (i = 0; i < REMOTE_ACL_HASHSIZE; i++) - INIT_LIST_HEAD(&racl->ra_perm_cache[i]); - - return racl; -} - -/* - * caller should guarantee no race here. - */ -static void remote_perm_flush_xperms(struct lustre_remote_perm *perm) -{ - struct remote_perm_setxid *xperm; - - while (!list_empty(&perm->lrp_setxid_perms)) { - xperm = list_entry(perm->lrp_setxid_perms.next, - struct remote_perm_setxid, - list); - list_del(&xperm->list); - OBD_FREE(xperm, sizeof(*xperm)); - } -} + GOTO(out, rc); -/* - * caller should guarantee no race here. - */ -static void remote_acl_flush(struct remote_acl *racl) -{ - struct list_head *head; - struct lustre_remote_perm *perm, *tmp; - int i; - - for (i = 0; i < REMOTE_ACL_HASHSIZE; i++) { - head = &racl->ra_perm_cache[i]; - - list_for_each_entry_safe(perm, tmp, head, lrp_list) { - remote_perm_flush_xperms(perm); - list_del(&perm->lrp_list); - OBD_FREE(perm, sizeof(*perm)); - } + ll_ioepoch_open(lli, op_data->op_ioepoch); + if (!lsm || !S_ISREG(inode->i_mode)) { + CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); + GOTO(out, rc = 0); } -} - -static void remote_acl_free(struct remote_acl *racl) -{ - if (!racl) - return; - - down(&racl->ra_update_sem); - spin_lock(&racl->ra_lock); - remote_acl_flush(racl); - spin_unlock(&racl->ra_lock); - up(&racl->ra_update_sem); - OBD_FREE(racl, sizeof(*racl)); -} - -static inline int remote_acl_hashfunc(__u32 id) -{ - return (id & (REMOTE_ACL_HASHSIZE - 1)); -} - -static -int __remote_acl_check(struct remote_acl *racl, unsigned int *perm) -{ - struct list_head *head; - struct lustre_remote_perm *lperm; - struct remote_perm_setxid *xperm; - int found = 0, rc = -ENOENT; - - LASSERT(racl); - head = &racl->ra_perm_cache[remote_acl_hashfunc(current->uid)]; - spin_lock(&racl->ra_lock); - - list_for_each_entry(lperm, head, lrp_list) { - if (lperm->lrp_auth_uid == current->uid) { - found = 1; - break; - } - } - - if (!found) - goto out; - - if (lperm->lrp_auth_uid == current->fsuid && - lperm->lrp_auth_gid == current->fsgid) { - if (lperm->lrp_valid) { - *perm = lperm->lrp_perm; - rc = 0; - } - goto out; - } else if ((!lperm->lrp_setuid && - lperm->lrp_auth_uid != current->fsuid) || - (!lperm->lrp_setgid && - lperm->lrp_auth_gid != current->fsgid)) { - *perm = 0; - rc = 0; - goto out; - } - - list_for_each_entry(xperm, &lperm->lrp_setxid_perms, list) { - if (xperm->uid == current->fsuid && - xperm->gid == current->fsgid) { - *perm = xperm->perm; - rc = 0; - goto out; - } - } - -out: - spin_unlock(&racl->ra_lock); - return rc; -} - -static -int __remote_acl_update(struct remote_acl *racl, - struct mds_remote_perm *mperm, - struct lustre_remote_perm *lperm, - struct remote_perm_setxid *xperm) -{ - struct list_head *head; - struct lustre_remote_perm *lp; - struct remote_perm_setxid *xp; - int found = 0, setuid = 0, setgid = 0; - - LASSERT(racl); - LASSERT(mperm); - LASSERT(lperm); - LASSERT(current->uid == mperm->mrp_auth_uid); - - if (current->fsuid != mperm->mrp_auth_uid) - setuid = 1; - if (current->fsgid != mperm->mrp_auth_gid) - setgid = 1; - - head = &racl->ra_perm_cache[remote_acl_hashfunc(current->uid)]; - spin_lock(&racl->ra_lock); - - list_for_each_entry(lp, head, lrp_list) { - if (lp->lrp_auth_uid == current->uid) { - found = 1; - break; - } - } - - if (found) { - OBD_FREE(lperm, sizeof(*lperm)); - - if (!lp->lrp_valid && !setuid && !setgid) { - lp->lrp_perm = mperm->mrp_perm; - lp->lrp_valid = 1; - } - - /* sanity check for changes of setxid rules */ - if ((lp->lrp_setuid != 0) != (mperm->mrp_allow_setuid != 0)) { - CWARN("setuid changes: %d => %d\n", - (lp->lrp_setuid != 0), - (mperm->mrp_allow_setuid != 0)); - lp->lrp_setuid = (mperm->mrp_allow_setuid != 0); - } - - if ((lp->lrp_setgid != 0) != (mperm->mrp_allow_setgid != 0)) { - CWARN("setgid changes: %d => %d\n", - (lp->lrp_setgid != 0), - (mperm->mrp_allow_setgid != 0)); - lp->lrp_setgid = (mperm->mrp_allow_setgid != 0); - } - - if (!lp->lrp_setuid && !lp->lrp_setgid && - !list_empty(&lp->lrp_setxid_perms)) { - remote_perm_flush_xperms(lp); - } - } else { - /* initialize lperm and linked into hashtable - */ - INIT_LIST_HEAD(&lperm->lrp_setxid_perms); - lperm->lrp_auth_uid = mperm->mrp_auth_uid; - lperm->lrp_auth_gid = mperm->mrp_auth_gid; - lperm->lrp_setuid = (mperm->mrp_allow_setuid != 0); - lperm->lrp_setgid = (mperm->mrp_allow_setgid != 0); - list_add(&lperm->lrp_list, head); - - if (!setuid && !setgid) { - /* in this case, i'm the authenticated user, - * and mrp_perm is for me. - */ - lperm->lrp_perm = mperm->mrp_perm; - lperm->lrp_valid = 1; - spin_unlock(&racl->ra_lock); - - if (xperm) - OBD_FREE(xperm, sizeof(*xperm)); - return 0; - } - - lp = lperm; - /* fall through */ - } - - LASSERT(lp->lrp_setuid || lp->lrp_setgid || - list_empty(&lp->lrp_setxid_perms)); - - /* if no xperm supplied, we are all done here */ - if (!xperm) { - spin_unlock(&racl->ra_lock); - return 0; - } - - /* whether we allow setuid/setgid */ - if ((!lp->lrp_setuid && setuid) || (!lp->lrp_setgid && setgid)) { - OBD_FREE(xperm, sizeof(*xperm)); - spin_unlock(&racl->ra_lock); - return 0; + if (ia_valid & ATTR_SIZE) + rc = ll_setattr_do_truncate(inode, attr->ia_size); + else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { + CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", + inode->i_ino, LTIME_S(attr->ia_mtime)); + rc = ll_setattr_ost(inode); } - - /* traverse xperm list */ - list_for_each_entry(xp, &lp->lrp_setxid_perms, list) { - if (xp->uid == current->fsuid && - xp->gid == current->fsgid) { - if (xp->perm != mperm->mrp_perm) { - /* actually this should not happen */ - CWARN("perm changed: %o => %o\n", - xp->perm, mperm->mrp_perm); - xp->perm = mperm->mrp_perm; - } - OBD_FREE(xperm, sizeof(*xperm)); - spin_unlock(&racl->ra_lock); - return 0; - } + EXIT; +out: + if (op_data) { + if (op_data->op_ioepoch) + rc1 = ll_setattr_done_writing(inode, op_data, mod); + ll_finish_md_op_data(op_data); } - - /* finally insert this xperm */ - xperm->uid = current->fsuid; - xperm->gid = current->fsgid; - xperm->perm = mperm->mrp_perm; - list_add(&xperm->list, &lp->lrp_setxid_perms); - - spin_unlock(&racl->ra_lock); - return 0; + return rc ? rc : rc1; } -/* - * remote_acl semaphore must be held by caller - */ -static -int remote_acl_update_locked(struct remote_acl *racl, - struct mds_remote_perm *mperm) +int ll_setattr(struct dentry *de, struct iattr *attr) { - struct lustre_remote_perm *lperm; - struct remote_perm_setxid *xperm; - int setuid = 0, setgid = 0; + if ((attr->ia_valid & (ATTR_CTIME|ATTR_SIZE|ATTR_MODE)) == + (ATTR_CTIME|ATTR_SIZE|ATTR_MODE)) + attr->ia_valid |= MDS_OPEN_OWNEROVERRIDE; - might_sleep(); + if ((de->d_inode->i_mode & S_ISUID) && + !(attr->ia_mode & S_ISUID) && + !(attr->ia_valid & ATTR_KILL_SUID)) + attr->ia_valid |= ATTR_KILL_SUID; - if (current->uid != mperm->mrp_auth_uid) { - CERROR("current uid %u while authenticated as %u\n", - current->uid, mperm->mrp_auth_uid); - return -EINVAL; - } + if (((de->d_inode->i_mode & (S_ISGID|S_IXGRP)) == (S_ISGID|S_IXGRP)) && + !(attr->ia_mode & S_ISGID) && + !(attr->ia_valid & ATTR_KILL_SGID)) + attr->ia_valid |= ATTR_KILL_SGID; - if (current->fsuid != mperm->mrp_auth_uid) - setuid = 1; - if (current->fsgid == mperm->mrp_auth_gid) - setgid = 1; + return ll_setattr_raw(de->d_inode, attr); +} - OBD_ALLOC(lperm, sizeof(*lperm)); - if (!lperm) - return -ENOMEM; +int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, + __u64 max_age, __u32 flags) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct obd_statfs obd_osfs; + int rc; + ENTRY; - if ((setuid || setgid) && - !(setuid && !mperm->mrp_allow_setuid) && - !(setgid && !mperm->mrp_allow_setgid)) { - OBD_ALLOC(xperm, sizeof(*xperm)); - if (!xperm) { - OBD_FREE(lperm, sizeof(*lperm)); - return -ENOMEM; - } - } else - xperm = NULL; + rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age, flags); + if (rc) { + CERROR("md_statfs fails: rc = %d\n", rc); + RETURN(rc); + } - return __remote_acl_update(racl, mperm, lperm, xperm); -} + osfs->os_type = sb->s_magic; -/* - * return -EACCES at any error cases - */ -int ll_remote_acl_permission(struct inode *inode, int mode) -{ - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct remote_acl *racl = ll_i2info(inode)->lli_remote_acl; - struct ptlrpc_request *req = NULL; - struct lustre_id id; - struct mds_remote_perm *mperm; - int rc = -EACCES, perm; + CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - if (!racl) - return -EACCES; + if (sbi->ll_flags & LL_SBI_LAZYSTATFS) + flags |= OBD_STATFS_NODELAY; - if (__remote_acl_check(racl, &perm) == 0) { - return ((perm & mode) == mode ? 0 : -EACCES); + rc = obd_statfs_rqset(class_exp2obd(sbi->ll_dt_exp), + &obd_osfs, max_age, flags); + if (rc) { + CERROR("obd_statfs fails: rc = %d\n", rc); + RETURN(rc); } - might_sleep(); + CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree, + obd_osfs.os_files); - /* doing update - */ - down(&racl->ra_update_sem); + osfs->os_bsize = obd_osfs.os_bsize; + osfs->os_blocks = obd_osfs.os_blocks; + osfs->os_bfree = obd_osfs.os_bfree; + osfs->os_bavail = obd_osfs.os_bavail; - /* we might lose the race when obtain semaphore, - * so check again. + /* If we don't have as many objects free on the OST as inodes + * on the MDS, we reduce the total number of inodes to + * compensate, so that the "inodes in use" number is correct. */ - if (__remote_acl_check(racl, &perm) == 0) { - if ((perm & mode) == mode) - rc = 0; - goto out; + if (obd_osfs.os_ffree < osfs->os_ffree) { + osfs->os_files = (osfs->os_files - osfs->os_ffree) + + obd_osfs.os_ffree; + osfs->os_ffree = obd_osfs.os_ffree; } - /* really fetch from mds - */ - ll_inode2id(&id, inode); - if (md_access_check(sbi->ll_md_exp, &id, &req)) - goto out; - - /* status non-zero indicate there's more apparent error - * detected by mds, e.g. didn't allow this user at all. - * we simply ignore and didn't cache it. - */ - if (req->rq_repmsg->status) - goto out; + RETURN(rc); +} +#ifndef HAVE_STATFS_DENTRY_PARAM +int ll_statfs(struct super_block *sb, struct kstatfs *sfs) +{ +#else +int ll_statfs(struct dentry *de, struct kstatfs *sfs) +{ + struct super_block *sb = de->d_sb; +#endif + struct obd_statfs osfs; + int rc; - mperm = lustre_swab_repbuf(req, 1, sizeof(*mperm), - lustre_swab_remote_perm); - LASSERT(mperm); - LASSERT_REPSWABBED(req, 1); + CDEBUG(D_VFSTRACE, "VFS Op: at "LPU64" jiffies\n", get_jiffies_64()); + ll_stats_ops_tally(ll_s2sbi(sb), LPROC_LL_STAFS, 1); - if ((mperm->mrp_perm & mode) == mode) - rc = 0; + /* For now we will always get up-to-date statfs values, but in the + * future we may allow some amount of caching on the client (e.g. + * from QOS or lprocfs updates). */ + rc = ll_statfs_internal(sb, &osfs, cfs_time_current_64() - 1, 0); + if (rc) + return rc; - remote_acl_update_locked(racl, mperm); -out: - if (req) - ptlrpc_req_finished(req); + statfs_unpack(sfs, &osfs); - up(&racl->ra_update_sem); - return rc; -} + /* We need to downshift for all 32-bit kernels, because we can't + * tell if the kernel is being called via sys_statfs64() or not. + * Stop before overflowing f_bsize - in which case it is better + * to just risk EOVERFLOW if caller is using old sys_statfs(). */ + if (sizeof(long) < 8) { + while (osfs.os_blocks > ~0UL && sfs->f_bsize < 0x40000000) { + sfs->f_bsize <<= 1; -int ll_remote_acl_update(struct inode *inode, struct mds_remote_perm *perm) -{ - struct remote_acl *racl = ll_i2info(inode)->lli_remote_acl; - int rc; + osfs.os_blocks >>= 1; + osfs.os_bfree >>= 1; + osfs.os_bavail >>= 1; + } + } - LASSERT(perm); + sfs->f_blocks = osfs.os_blocks; + sfs->f_bfree = osfs.os_bfree; + sfs->f_bavail = osfs.os_bavail; - if (!racl) - return -EACCES; + return 0; +} - down(&racl->ra_update_sem); - rc = remote_acl_update_locked(racl, perm); - up(&racl->ra_update_sem); +void ll_inode_size_lock(struct inode *inode, int lock_lsm) +{ + struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + + lli = ll_i2info(inode); + LASSERT(lli->lli_size_sem_owner != current); + down(&lli->lli_size_sem); + LASSERT(lli->lli_size_sem_owner == NULL); + lli->lli_size_sem_owner = current; + lsm = lli->lli_smd; + LASSERTF(lsm != NULL || lock_lsm == 0, "lsm %p, lock_lsm %d\n", + lsm, lock_lsm); + if (lock_lsm) + lov_stripe_lock(lsm); +} - return rc; +void ll_inode_size_unlock(struct inode *inode, int unlock_lsm) +{ + struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + + lli = ll_i2info(inode); + lsm = lli->lli_smd; + LASSERTF(lsm != NULL || unlock_lsm == 0, "lsm %p, lock_lsm %d\n", + lsm, unlock_lsm); + if (unlock_lsm) + lov_stripe_unlock(lsm); + LASSERT(lli->lli_size_sem_owner == current); + lli->lli_size_sem_owner = NULL; + up(&lli->lli_size_sem); } -void ll_inode_invalidate_acl(struct inode *inode) +static void ll_replace_lsm(struct inode *inode, struct lov_stripe_md *lsm) { - struct ll_sb_info *sbi = ll_i2sbi(inode); struct ll_inode_info *lli = ll_i2info(inode); - if (sbi->ll_remote) { - struct remote_acl *racl = lli->lli_remote_acl; - - LASSERT(!lli->lli_posix_acl); - if (racl) { - down(&racl->ra_update_sem); - spin_lock(&racl->ra_lock); - remote_acl_flush(lli->lli_remote_acl); - spin_unlock(&racl->ra_lock); - up(&racl->ra_update_sem); - } - } else { - /* we can't invalide acl here: suppose we touch a new file - * under a dir, blocking ast on dir will lead to open failure - * on client, although succeed on mds. it's kind of weird, - * the real fix i think is improve client-vfs interaction. - * - * currently we just do nothing here. - */ - return; - - LASSERT(!lli->lli_remote_acl); - spin_lock(&lli->lli_lock); - posix_acl_release(lli->lli_posix_acl); - lli->lli_posix_acl = NULL; - spin_unlock(&lli->lli_lock); - } + dump_lsm(D_INODE, lsm); + dump_lsm(D_INODE, lli->lli_smd); + LASSERTF(lsm->lsm_magic == LOV_MAGIC_JOIN, + "lsm must be joined lsm %p\n", lsm); + obd_free_memmd(ll_i2dtexp(inode), &lli->lli_smd); + CDEBUG(D_INODE, "replace lsm %p to lli_smd %p for inode %lu%u(%p)\n", + lsm, lli->lli_smd, inode->i_ino, inode->i_generation, inode); + lli->lli_smd = lsm; + lli->lli_maxbytes = lsm->lsm_maxbytes; + if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) + lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } void ll_update_inode(struct inode *inode, struct lustre_md *md) { struct ll_inode_info *lli = ll_i2info(inode); + struct mdt_body *body = md->body; struct lov_stripe_md *lsm = md->lsm; - struct mds_body *body = md->body; - struct mea *mea = md->mea; - struct posix_acl *posix_acl = md->posix_acl; struct ll_sb_info *sbi = ll_i2sbi(inode); - struct lustre_key *mkey = md->key; - ENTRY; - LASSERT((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); - - if (md->lsm && md->lsm->lsm_magic != LOV_MAGIC) { - /* check for default striping info for dir. */ - LASSERT((mea != NULL) == ((body->valid & OBD_MD_FLDIREA) != 0)); - } - + LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); if (lsm != NULL) { - LASSERT(lsm->lsm_object_gr > 0); if (lli->lli_smd == NULL) { + if (lsm->lsm_magic != LOV_MAGIC_V1 && + lsm->lsm_magic != LOV_MAGIC_V3 && + lsm->lsm_magic != LOV_MAGIC_JOIN) { + dump_lsm(D_ERROR, lsm); + LBUG(); + } + CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n", + lsm, inode->i_ino, inode->i_generation, inode); + cl_inode_init(inode, md); + /* ll_inode_size_lock() requires it is only + * called with lli_smd != NULL or lock_lsm == 0 + * or we can race between lock/unlock. + * bug 9547 */ lli->lli_smd = lsm; lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { - int i; - if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { - CERROR("lsm mismatch for inode %ld\n", - inode->i_ino); - CERROR("lli_smd:\n"); - dump_lsm(D_ERROR, lli->lli_smd); - CERROR("lsm:\n"); - dump_lsm(D_ERROR, lsm); - LBUG(); - } - /* XXX FIXME -- We should decide on a safer (atomic) and - * more elegant way to update the lsm */ - for (i = 0; i < lsm->lsm_stripe_count; i++) { - lli->lli_smd->lsm_oinfo[i].loi_id = - lsm->lsm_oinfo[i].loi_id; - lli->lli_smd->lsm_oinfo[i].loi_gr = - lsm->lsm_oinfo[i].loi_gr; - lli->lli_smd->lsm_oinfo[i].loi_ost_idx = - lsm->lsm_oinfo[i].loi_ost_idx; - lli->lli_smd->lsm_oinfo[i].loi_ost_gen = - lsm->lsm_oinfo[i].loi_ost_gen; + if (lli->lli_smd->lsm_magic == lsm->lsm_magic && + lli->lli_smd->lsm_stripe_count == + lsm->lsm_stripe_count) { + if (lov_stripe_md_cmp(lli->lli_smd, lsm)) { + CERROR("lsm mismatch for inode %ld\n", + inode->i_ino); + CERROR("lli_smd:\n"); + dump_lsm(D_ERROR, lli->lli_smd); + CERROR("lsm:\n"); + dump_lsm(D_ERROR, lsm); + LBUG(); + } + } else { + cl_inode_init(inode, md); + ll_replace_lsm(inode, lsm); } } - /* bug 2844 - limit i_blksize for broken user-space apps */ - LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize); - inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE); if (lli->lli_smd != lsm) obd_free_memmd(ll_i2dtexp(inode), &lsm); } - if (mea != NULL) { - if (lli->lli_mea == NULL) { - lli->lli_mea = mea; - } else { - if (memcmp(lli->lli_mea, mea, body->eadatasize)) { - CERROR("mea mismatch for inode %lu\n", - inode->i_ino); - LBUG(); - } - } - if (lli->lli_mea != mea) - obd_free_memmd(ll_i2mdexp(inode), - (struct lov_stripe_md **) &mea); - } - - if (body->valid & OBD_MD_FID) - id_assign_fid(&lli->lli_id, &body->id1); - - if (body->valid & OBD_MD_FLID) - id_ino(&lli->lli_id) = id_ino(&body->id1); - - if (body->valid & OBD_MD_FLGENER) - id_gen(&lli->lli_id) = id_gen(&body->id1); - - /* local/remote ACL */ - if (sbi->ll_remote) { - LASSERT(md->posix_acl == NULL); - if (md->remote_perm) { - ll_remote_acl_update(inode, md->remote_perm); - OBD_FREE(md->remote_perm, sizeof(*md->remote_perm)); - md->remote_perm = NULL; - } - } else { - LASSERT(md->remote_perm == NULL); + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { + if (body->valid & OBD_MD_FLRMTPERM) + ll_update_remote_perm(inode, md->remote_perm); + } +#ifdef CONFIG_FS_POSIX_ACL + else if (body->valid & OBD_MD_FLACL) { spin_lock(&lli->lli_lock); - if (posix_acl != NULL) { - if (lli->lli_posix_acl != NULL) - posix_acl_release(lli->lli_posix_acl); - lli->lli_posix_acl = posix_acl; - } + if (lli->lli_posix_acl) + posix_acl_release(lli->lli_posix_acl); + lli->lli_posix_acl = md->posix_acl; spin_unlock(&lli->lli_lock); } +#endif + inode->i_ino = ll_fid_build_ino(sbi, &body->fid1); + inode->i_generation = ll_fid_build_gen(sbi, &body->fid1); - if (body->valid & OBD_MD_FLID) - inode->i_ino = id_ino(&body->id1); - if (body->valid & OBD_MD_FLGENER) - inode->i_generation = id_gen(&body->id1); - if (body->valid & OBD_MD_FLATIME) + if (body->valid & OBD_MD_FLATIME && + body->atime > LTIME_S(inode->i_atime)) LTIME_S(inode->i_atime) = body->atime; - if (body->valid & OBD_MD_FLMTIME && - body->mtime > LTIME_S(inode->i_mtime)) { - CDEBUG(D_INODE, "setting ino %lu mtime from %lu to %u\n", - inode->i_ino, LTIME_S(inode->i_mtime), body->mtime); - LTIME_S(inode->i_mtime) = body->mtime; - } + + /* mtime is always updated with ctime, but can be set in past. + As write and utime(2) may happen within 1 second, and utime's + mtime has a priority over write's one, so take mtime from mds + for the same ctimes. */ if (body->valid & OBD_MD_FLCTIME && - body->ctime > LTIME_S(inode->i_ctime)) + body->ctime >= LTIME_S(inode->i_ctime)) { LTIME_S(inode->i_ctime) = body->ctime; - if (body->valid & OBD_MD_FLMODE) { - inode->i_mode = (inode->i_mode & S_IFMT) | - (body->mode & ~S_IFMT); + if (body->valid & OBD_MD_FLMTIME) { + CDEBUG(D_INODE, "setting ino %lu mtime " + "from %lu to "LPU64"\n", inode->i_ino, + LTIME_S(inode->i_mtime), body->mtime); + LTIME_S(inode->i_mtime) = body->mtime; + } } - if (body->valid & OBD_MD_FLTYPE) { - inode->i_mode = (inode->i_mode & ~S_IFMT) | - (body->mode & S_IFMT); + if (body->valid & OBD_MD_FLMODE) + inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT); + if (body->valid & OBD_MD_FLTYPE) + inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT); + LASSERT(inode->i_mode != 0); + if (S_ISREG(inode->i_mode)) { + inode->i_blkbits = min(PTLRPC_MAX_BRW_BITS + 1, LL_MAX_BLKSIZE_BITS); + } else { + inode->i_blkbits = inode->i_sb->s_blocksize_bits; } +#ifdef HAVE_INODE_BLKSIZE + inode->i_blksize = 1<i_blkbits; +#endif if (body->valid & OBD_MD_FLUID) inode->i_uid = body->uid; if (body->valid & OBD_MD_FLGID) inode->i_gid = body->gid; if (body->valid & OBD_MD_FLFLAGS) - inode->i_flags = body->flags; + inode->i_flags = ll_ext_to_inode_flags(body->flags); if (body->valid & OBD_MD_FLNLINK) inode->i_nlink = body->nlink; if (body->valid & OBD_MD_FLRDEV) -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - inode->i_rdev = body->rdev; -#else inode->i_rdev = old_decode_dev(body->rdev); -#endif - if (body->valid & OBD_MD_FLSIZE) - inode->i_size = body->size; - if (body->valid & OBD_MD_FLBLOCKS) - inode->i_blocks = body->blocks; - - if (body->valid & OBD_MD_FLSIZE) - set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); - - if (body->valid & OBD_MD_FLAUDIT) { - struct ll_sb_info * sbi = ll_s2sbi(inode->i_sb); - if (IS_AUDIT_OP(body->audit, AUDIT_FS)) - sbi->ll_audit_mask = body->audit; - else - lli->lli_audit_mask = body->audit; - } - if (mkey != NULL) { - LASSERT(body->valid & OBD_MD_FLKEY); - ll_crypto_init_inode_key(inode, mkey); + if (body->valid & OBD_MD_FLID) { + /* FID shouldn't be changed! */ + if (fid_is_sane(&lli->lli_fid)) { + LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1), + "Trying to change FID "DFID + " to the "DFID", inode %lu/%u(%p)\n", + PFID(&lli->lli_fid), PFID(&body->fid1), + inode->i_ino, inode->i_generation, inode); + } else + lli->lli_fid = body->fid1; + } + + LASSERT(fid_seq(&lli->lli_fid) != 0); + + if (body->valid & OBD_MD_FLSIZE) { + if (exp_connect_som(ll_i2mdexp(inode)) && + S_ISREG(inode->i_mode) && lli->lli_smd) { + struct lustre_handle lockh; + ldlm_mode_t mode; + + /* As it is possible a blocking ast has been processed + * by this time, we need to check there is an UPDATE + * lock on the client and set LLIF_MDS_SIZE_LOCK holding + * it. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE, + &lockh); + if (mode) { + if (lli->lli_flags & (LLIF_DONE_WRITING | + LLIF_EPOCH_PENDING | + LLIF_SOM_DIRTY)) { + CERROR("ino %lu flags %lu still has " + "size authority! do not trust " + "the size got from MDS\n", + inode->i_ino, lli->lli_flags); + } else { + /* Use old size assignment to avoid + * deadlock bz14138 & bz14326 */ + inode->i_size = body->size; + lli->lli_flags |= LLIF_MDS_SIZE_LOCK; + } + ldlm_lock_decref(&lockh, mode); + } + } else { + /* Use old size assignment to avoid + * deadlock bz14138 & bz14326 */ + inode->i_size = body->size; + } + + if (body->valid & OBD_MD_FLBLOCKS) + inode->i_blocks = body->blocks; } -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - inode->i_dev = (kdev_t)id_group(&lli->lli_id); -#endif - LASSERT(id_fid(&lli->lli_id) != 0); + if (body->valid & OBD_MD_FLMDSCAPA) { + LASSERT(md->mds_capa); + ll_add_capa(inode, md->mds_capa); + } + if (body->valid & OBD_MD_FLOSSCAPA) { + LASSERT(md->oss_capa); + ll_add_capa(inode, md->oss_capa); + } } -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) static struct backing_dev_info ll_backing_dev_info = { .ra_pages = 0, /* No readahead */ +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12)) + .capabilities = 0, /* Does contribute to dirty memory */ +#else .memory_backed = 0, /* Does contribute to dirty memory */ -}; #endif +}; void ll_read_inode2(struct inode *inode, void *opaque) { @@ -2140,18 +1720,13 @@ void ll_read_inode2(struct inode *inode, void *opaque) struct ll_inode_info *lli = ll_i2info(inode); ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, - inode->i_generation, inode); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", + inode->i_ino, inode->i_generation, inode); ll_lli_init(lli); LASSERT(!lli->lli_smd); - if (ll_i2sbi(inode)->ll_remote) { - lli->lli_remote_acl = remote_acl_alloc(); - /* if failed alloc, nobody will be able to access this inode */ - } - /* Core attributes from the MDS first. This is a new inode, and * the VFS doesn't zero times in the core inode so we have to do * it ourselves. They will be overwritten by either MDS or OST @@ -2159,15 +1734,15 @@ void ll_read_inode2(struct inode *inode, void *opaque) LTIME_S(inode->i_mtime) = 0; LTIME_S(inode->i_atime) = 0; LTIME_S(inode->i_ctime) = 0; - inode->i_rdev = 0; ll_update_inode(inode, md); /* OIDEBUG(inode); */ if (S_ISREG(inode->i_mode)) { + struct ll_sb_info *sbi = ll_i2sbi(inode); inode->i_op = &ll_file_inode_operations; - inode->i_fop = &ll_file_operations; + inode->i_fop = sbi->ll_fop; inode->i_mapping->a_ops = &ll_aops; EXIT; } else if (S_ISDIR(inode->i_mode)) { @@ -2181,33 +1756,12 @@ void ll_read_inode2(struct inode *inode, void *opaque) } else { inode->i_op = &ll_special_inode_operations; -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) init_special_inode(inode, inode->i_mode, kdev_t_to_nr(inode->i_rdev)); /* initializing backing dev info. */ inode->i_mapping->backing_dev_info = &ll_backing_dev_info; -#else - init_special_inode(inode, inode->i_mode, inode->i_rdev); -#endif - lli->ll_save_ifop = inode->i_fop; - - if (S_ISCHR(inode->i_mode)) - inode->i_fop = &ll_special_chr_inode_fops; - else if (S_ISBLK(inode->i_mode)) - inode->i_fop = &ll_special_blk_inode_fops; - else if (S_ISFIFO(inode->i_mode)) - inode->i_fop = &ll_special_fifo_inode_fops; - else if (S_ISSOCK(inode->i_mode)) - inode->i_fop = &ll_special_sock_inode_fops; - - CWARN("saved %p, replaced with %p\n", lli->ll_save_ifop, - inode->i_fop); - - if (lli->ll_save_ifop->owner) { - CWARN("%p has owner %p\n", lli->ll_save_ifop, - lli->ll_save_ifop->owner); - } + EXIT; } } @@ -2215,19 +1769,16 @@ void ll_read_inode2(struct inode *inode, void *opaque) void ll_delete_inode(struct inode *inode) { struct ll_sb_info *sbi = ll_i2sbi(inode); - struct lustre_id id; int rc; ENTRY; - ll_inode2id(&id, inode); - - rc = md_delete_inode(sbi->ll_md_exp, &id); + rc = obd_fid_delete(sbi->ll_md_exp, ll_inode2fid(inode)); if (rc) { - CERROR("md_delete_inode() failed, error %d\n", - rc); + CERROR("fid_delete() failed, rc %d\n", rc); } - + truncate_inode_pages(&inode->i_data, 0); clear_inode(inode); + EXIT; } @@ -2241,95 +1792,75 @@ int ll_iocontrol(struct inode *inode, struct file *file, switch(cmd) { case EXT3_IOC_GETFLAGS: { - struct lustre_id id; - __u64 valid = OBD_MD_FLFLAGS; - struct mds_body *body; + struct mdt_body *body; + struct obd_capa *oc; - ll_inode2id(&id, inode); - rc = md_getattr(sbi->ll_md_exp, &id, valid, NULL, NULL, - 0, 0, NULL, &req); + oc = ll_mdscapa_get(inode); + rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, + OBD_MD_FLFLAGS, 0, &req); + capa_put(oc); if (rc) { CERROR("failure %d inode %lu\n", rc, inode->i_ino); RETURN(-abs(rc)); } - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (body->flags & S_APPEND) - flags |= EXT3_APPEND_FL; - if (body->flags & S_IMMUTABLE) - flags |= EXT3_IMMUTABLE_FL; - if (body->flags & S_NOATIME) - flags |= EXT3_NOATIME_FL; + flags = body->flags; - ptlrpc_req_finished (req); + ptlrpc_req_finished(req); RETURN(put_user(flags, (int *)arg)); } case EXT3_IOC_SETFLAGS: { - struct mdc_op_data *op_data; - struct iattr attr; - struct obdo *oa; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct obd_info oinfo = { { { 0 } } }; + struct md_op_data *op_data; if (get_user(flags, (int *)arg)) RETURN(-EFAULT); - oa = obdo_alloc(); - if (!oa) - RETURN(-ENOMEM); - - OBD_ALLOC(op_data, sizeof(*op_data)); - if (op_data == NULL) { - obdo_free(oa); + oinfo.oi_md = lsm; + OBDO_ALLOC(oinfo.oi_oa); + if (!oinfo.oi_oa) RETURN(-ENOMEM); - } - ll_inode2mdc_data(op_data, inode, (OBD_MD_FLID | OBD_MD_MEA)); - memset(&attr, 0x0, sizeof(attr)); - attr.ia_attr_flags = flags; - attr.ia_valid |= ATTR_ATTR_FLAG; + op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = flags; + op_data->op_attr.ia_valid |= ATTR_ATTR_FLAG; rc = md_setattr(sbi->ll_md_exp, op_data, - &attr, NULL, 0, NULL, 0, NULL, 0, &req); - OBD_FREE(op_data, sizeof(*op_data)); - if (rc) { - ptlrpc_req_finished(req); - if (rc != -EPERM && rc != -EACCES) - CERROR("md_setattr fails: rc = %d\n", rc); - obdo_free(oa); + NULL, 0, NULL, 0, &req, NULL); + ll_finish_md_op_data(op_data); + ptlrpc_req_finished(req); + if (rc || lsm == NULL) { + OBDO_FREE(oinfo.oi_oa); RETURN(rc); } - ptlrpc_req_finished(req); - oa->o_id = lsm->lsm_object_id; - oa->o_gr = lsm->lsm_object_gr; - oa->o_flags = flags; - *(obdo_id(oa)) = ll_i2info(inode)->lli_id; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP - | OBD_MD_FLIFID; - - rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL, NULL); - obdo_free(oa); + oinfo.oi_oa->o_id = lsm->lsm_object_id; + oinfo.oi_oa->o_gr = lsm->lsm_object_gr; + oinfo.oi_oa->o_flags = flags; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | + OBD_MD_FLGROUP; + oinfo.oi_capa = ll_mdscapa_get(inode); + + obdo_from_inode(oinfo.oi_oa, inode, + OBD_MD_FLFID | OBD_MD_FLGENER); + rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL); + capa_put(oinfo.oi_capa); + OBDO_FREE(oinfo.oi_oa); if (rc) { if (rc != -EPERM && rc != -EACCES) - CERROR("md_setattr fails: rc = %d\n", rc); + CERROR("md_setattr_async fails: rc = %d\n", rc); RETURN(rc); } - if (flags & EXT3_APPEND_FL) - inode->i_flags |= S_APPEND; - else - inode->i_flags &= ~S_APPEND; - if (flags & EXT3_IMMUTABLE_FL) - inode->i_flags |= S_IMMUTABLE; - else - inode->i_flags &= ~S_IMMUTABLE; - if (flags & EXT3_NOATIME_FL) - inode->i_flags |= S_NOATIME; - else - inode->i_flags &= ~S_NOATIME; - + inode->i_flags = ll_ext_to_inode_flags(flags | + MDS_BFLAG_EXT_FLAGS); RETURN(0); } default: @@ -2339,17 +1870,49 @@ int ll_iocontrol(struct inode *inode, struct file *file, RETURN(0); } -/* this is only called in the case of forced umount. */ +int ll_flush_ctx(struct inode *inode) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + + CDEBUG(D_SEC, "flush context for user %d\n", current->uid); + + obd_set_info_async(sbi->ll_md_exp, + sizeof(KEY_FLUSH_CTX), KEY_FLUSH_CTX, + 0, NULL, NULL); + obd_set_info_async(sbi->ll_dt_exp, + sizeof(KEY_FLUSH_CTX), KEY_FLUSH_CTX, + 0, NULL, NULL); + return 0; +} + +/* umount -f client means force down, don't save state */ +#ifdef HAVE_UMOUNTBEGIN_VFSMOUNT +void ll_umount_begin(struct vfsmount *vfsmnt, int flags) +{ + struct super_block *sb = vfsmnt->mnt_sb; +#else void ll_umount_begin(struct super_block *sb) { +#endif + struct lustre_sb_info *lsi = s2lsi(sb); struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_ioctl_data ioc_data = { 0 }; struct obd_device *obd; + struct obd_ioctl_data ioc_data = { 0 }; ENTRY; - + +#ifdef HAVE_UMOUNTBEGIN_VFSMOUNT + if (!(flags & MNT_FORCE)) { + EXIT; + return; + } +#endif + + /* Tell the MGC we got umount -f */ + lsi->lsi_flags |= LSI_UMOUNT_FORCE; + CDEBUG(D_VFSTRACE, "VFS Op: superblock %p count %d active %d\n", sb, sb->s_count, atomic_read(&sb->s_active)); - + obd = class_exp2obd(sbi->ll_md_exp); if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", @@ -2357,9 +1920,9 @@ void ll_umount_begin(struct super_block *sb) EXIT; return; } - obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, - sizeof(ioc_data), &ioc_data, NULL); + obd->obd_force = 1; + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, sizeof ioc_data, + &ioc_data, NULL); obd = class_exp2obd(sbi->ll_dt_exp); if (obd == NULL) { @@ -2369,117 +1932,251 @@ void ll_umount_begin(struct super_block *sb) return; } - obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, - sizeof(ioc_data), &ioc_data, NULL); + obd->obd_force = 1; + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, sizeof ioc_data, + &ioc_data, NULL); - /* - * really, we'd like to wait until there are no requests outstanding, + /* Really, we'd like to wait until there are no requests outstanding, * and then continue. For now, we just invalidate the requests, - * schedule, and hope. + * schedule() and sleep one second if needed, and hope. */ schedule(); +#ifdef HAVE_UMOUNTBEGIN_VFSMOUNT + if (atomic_read(&vfsmnt->mnt_count) > 2) { + cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(1)); + if (atomic_read(&vfsmnt->mnt_count) > 2) + LCONSOLE_WARN("Mount still busy with %d refs! You " + "may try to umount it a bit later\n", + atomic_read(&vfsmnt->mnt_count)); + } +#endif EXIT; } -int ll_prep_inode(struct obd_export *dt_exp, struct obd_export *md_exp, - struct inode **inode, struct ptlrpc_request *req, - int offset, struct super_block *sb) +int ll_remount_fs(struct super_block *sb, int *flags, char *data) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + int err; + __u32 read_only; + + if ((*flags & MS_RDONLY) != (sb->s_flags & MS_RDONLY)) { + read_only = *flags & MS_RDONLY; + err = obd_set_info_async(sbi->ll_md_exp, + sizeof(KEY_READ_ONLY), + KEY_READ_ONLY, sizeof(read_only), + &read_only, NULL); + if (err) { + CERROR("Failed to change the read-only flag during " + "remount: %d\n", err); + return err; + } + + if (read_only) + sb->s_flags |= MS_RDONLY; + else + sb->s_flags &= ~MS_RDONLY; + } + return 0; +} + +int ll_prep_inode(struct inode **inode, + struct ptlrpc_request *req, + struct super_block *sb) { + struct ll_sb_info *sbi = NULL; struct lustre_md md; - int rc = 0; + int rc; + ENTRY; - rc = mdc_req2lustre_md(md_exp, req, offset, dt_exp, &md); + LASSERT(*inode || sb); + sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode); + prune_deathrow(sbi, 1); + memset(&md, 0, sizeof(struct lustre_md)); + + rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp, + sbi->ll_md_exp, &md); if (rc) RETURN(rc); if (*inode) { ll_update_inode(*inode, &md); } else { - LASSERT(sb); - *inode = ll_iget(sb, id_ino(&md.body->id1), &md); - if (*inode == NULL || is_bad_inode(*inode)) { - /* free the lsm if we allocated one above */ - if (md.lsm != NULL) - obd_free_memmd(dt_exp, &md.lsm); - if (md.mea != NULL) - obd_free_memmd(md_exp, - (struct lov_stripe_md**)&md.mea); - rc = -ENOMEM; + LASSERT(sb != NULL); + + /* + * At this point server returns to client's same fid as client + * generated for creating. So using ->fid1 is okay here. + */ + LASSERT(fid_is_sane(&md.body->fid1)); + + *inode = ll_iget(sb, ll_fid_build_ino(sbi, &md.body->fid1),&md); + if (*inode == NULL || IS_ERR(*inode)) { + if (md.lsm) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); +#ifdef CONFIG_FS_POSIX_ACL + if (md.posix_acl) { + posix_acl_release(md.posix_acl); + md.posix_acl = NULL; + } +#endif + rc = IS_ERR(*inode) ? PTR_ERR(*inode) : -ENOMEM; + *inode = NULL; CERROR("new_inode -fatal: rc %d\n", rc); + GOTO(out, rc); } } + rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, + ll_i2info(*inode)->lli_smd); +out: + md_free_lustre_md(sbi->ll_md_exp, &md); RETURN(rc); } -int ll_show_options(struct seq_file *m, struct vfsmount *mnt) +int ll_obd_statfs(struct inode *inode, void *arg) { - struct ll_sb_info *sbi = ll_s2sbi(mnt->mnt_sb); - struct lustre_mount_data *lmd = sbi->ll_lmd; + struct ll_sb_info *sbi = NULL; + struct obd_export *exp; + char *buf = NULL; + struct obd_ioctl_data *data = NULL; + __u32 type; + int len = 0, rc; - if (lmd) { - seq_printf(m, ",mds_sec=%s,oss_sec=%s", - lmd->lmd_mds_security, lmd->lmd_oss_security); - } - seq_printf(m, ",%s", sbi->ll_remote ? "remote" : "local"); - if (sbi->ll_remote && lmd) - seq_printf(m, ",nllu=%u:%u", lmd->lmd_nllu, lmd->lmd_nllg); + if (!inode || !(sbi = ll_i2sbi(inode))) + GOTO(out_statfs, rc = -EINVAL); - if (lmd && lmd->lmd_pag) - seq_printf(m, ",pag"); + rc = obd_ioctl_getdata(&buf, &len, arg); + if (rc) + GOTO(out_statfs, rc); + + data = (void*)buf; + if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2 || + !data->ioc_pbuf1 || !data->ioc_pbuf2) + GOTO(out_statfs, rc = -EINVAL); + + memcpy(&type, data->ioc_inlbuf1, sizeof(__u32)); + if (type == LL_STATFS_MDC) + exp = sbi->ll_md_exp; + else if (type == LL_STATFS_LOV) + exp = sbi->ll_dt_exp; + else + GOTO(out_statfs, rc = -ENODEV); - return 0; + rc = obd_iocontrol(IOC_OBD_STATFS, exp, len, buf, NULL); + if (rc) + GOTO(out_statfs, rc); +out_statfs: + if (buf) + obd_ioctl_freedata(buf, len); + return rc; } -int ll_get_fid(struct obd_export *exp, struct lustre_id *idp, - char *filename, struct lustre_id *ret) +int ll_process_config(struct lustre_cfg *lcfg) { - struct ptlrpc_request *request = NULL; - struct mds_body *body; - int rc; + char *ptr; + void *sb; + struct lprocfs_static_vars lvars; + unsigned long x; + int rc = 0; - rc = md_getattr_lock(exp, idp, filename, strlen(filename) + 1, - OBD_MD_FID, 0, &request); - if (rc < 0) { - CDEBUG(D_INFO, "md_getattr_lock failed on %s: rc %d\n", - filename, rc); - return rc; - } + lprocfs_llite_init_vars(&lvars); - body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); - LASSERT(body != NULL); - LASSERT_REPSWABBED(request, 0); + /* The instance name contains the sb: lustre-client-aacfe000 */ + ptr = strrchr(lustre_cfg_string(lcfg, 0), '-'); + if (!ptr || !*(++ptr)) + return -EINVAL; + if (sscanf(ptr, "%lx", &x) != 1) + return -EINVAL; + sb = (void *)x; + /* This better be a real Lustre superblock! */ + LASSERT(s2lsi((struct super_block *)sb)->lsi_lmd->lmd_magic == LMD_MAGIC); + + /* Note we have not called client_common_fill_super yet, so + proc fns must be able to handle that! */ + rc = class_process_proc_param(PARAM_LLITE, lvars.obd_vars, + lcfg, sb); + if (rc > 0) + rc = 0; + return(rc); +} - *ret = body->id1; - ptlrpc_req_finished(request); +/* this function prepares md_op_data hint for passing ot down to MD stack. */ +struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, + struct inode *i1, struct inode *i2, + const char *name, int namelen, + int mode, __u32 opc, void *data) +{ + LASSERT(i1 != NULL); - return rc; + if (namelen > ll_i2sbi(i1)->ll_namelen) + return ERR_PTR(-ENAMETOOLONG); + + if (op_data == NULL) + OBD_ALLOC_PTR(op_data); + + if (op_data == NULL) + return ERR_PTR(-ENOMEM); + + ll_i2gids(op_data->op_suppgids, i1, i2); + op_data->op_fid1 = *ll_inode2fid(i1); + op_data->op_capa1 = ll_mdscapa_get(i1); + + if (i2) { + op_data->op_fid2 = *ll_inode2fid(i2); + op_data->op_capa2 = ll_mdscapa_get(i2); + } else { + fid_zero(&op_data->op_fid2); + op_data->op_capa2 = NULL; + } + + op_data->op_name = name; + op_data->op_namelen = namelen; + op_data->op_mode = mode; + op_data->op_mod_time = cfs_time_current_sec(); + op_data->op_fsuid = current->fsuid; + op_data->op_fsgid = current->fsgid; + op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_bias = MDS_CHECK_SPLIT; + op_data->op_opc = opc; + op_data->op_mds = 0; + op_data->op_data = data; + + return op_data; } -int ll_flush_cred(struct inode *inode) + +void ll_finish_md_op_data(struct md_op_data *op_data) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - int rc = 0; + capa_put(op_data->op_capa1); + capa_put(op_data->op_capa2); + OBD_FREE_PTR(op_data); +} - /* XXX to avoid adding api, we simply use set_info() interface - * to notify underlying obds. set_info() is more like a ioctl() now... - */ - if (sbi->ll_md_exp) { - rc = obd_set_info(sbi->ll_md_exp, - strlen("flush_cred"), "flush_cred", - 0, NULL); - if (rc) - return rc; - } +int ll_show_options(struct seq_file *seq, struct vfsmount *vfs) +{ + struct ll_sb_info *sbi; - if (sbi->ll_dt_exp) { - rc = obd_set_info(sbi->ll_dt_exp, - strlen("flush_cred"), "flush_cred", - 0, NULL); - if (rc) - return rc; - } + LASSERT((seq != NULL) && (vfs != NULL)); + sbi = ll_s2sbi(vfs->mnt_sb); - return rc; + if (sbi->ll_flags & LL_SBI_NOLCK) + seq_puts(seq, ",nolock"); + + if (sbi->ll_flags & LL_SBI_FLOCK) + seq_puts(seq, ",flock"); + + if (sbi->ll_flags & LL_SBI_LOCALFLOCK) + seq_puts(seq, ",localflock"); + + if (sbi->ll_flags & LL_SBI_USER_XATTR) + seq_puts(seq, ",user_xattr"); + + if (sbi->ll_flags & LL_SBI_ACL) + seq_puts(seq, ",acl"); + + if (sbi->ll_flags & LL_SBI_LAZYSTATFS) + seq_puts(seq, ",lazystatfs"); + + RETURN(0); }