X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fllite_lib.c;h=c17ad638be6cf9a6c201beffa3a03fb79f7bcbe3;hp=791dfbc4a13717be8b6085fdf76ada437afe2276;hb=47072597e51371a541a872fdbaa2782ddda91e86;hpb=19b55ff0decd600807e02f318e3dee0a965e4a47 diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 791dfbc..d0ca61d 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -24,320 +24,452 @@ #define DEBUG_SUBSYSTEM S_LLITE #include +#include #include #include +#include + #include #include #include -#include -#include #include +#include +#include +#include +#include #include "llite_internal.h" kmem_cache_t *ll_file_data_slab; +kmem_cache_t *ll_intent_slab; extern struct address_space_operations ll_aops; extern struct address_space_operations ll_dir_aops; -extern struct super_operations ll_super_operations; #ifndef log2 #define log2(n) ffz(~(n)) #endif -char *ll_read_opt(const char *opt, char *data) +struct ll_sb_info *lustre_init_sbi(struct super_block *sb) { - char *value; - char *retval; + struct ll_sb_info *sbi = NULL; + class_uuid_t uuid; ENTRY; - CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if (strncmp(opt, data, strlen(opt))) - RETURN(NULL); - if ((value = strchr(data, '=')) == NULL) - RETURN(NULL); - - value++; - OBD_ALLOC(retval, strlen(value) + 1); - if (!retval) { - CERROR("out of memory!\n"); + OBD_ALLOC(sbi, sizeof(*sbi)); + if (!sbi) RETURN(NULL); - } - - memcpy(retval, value, strlen(value)+1); - CDEBUG(D_SUPER, "Assigned option: %s, value %s\n", opt, retval); - RETURN(retval); -} - -int ll_set_opt(const char *opt, char *data, int fl) -{ - ENTRY; - CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); - if (strncmp(opt, data, strlen(opt))) - RETURN(0); + spin_lock_init(&sbi->ll_lock); + INIT_LIST_HEAD(&sbi->ll_pglist); + sbi->ll_pglist_gen = 0; + if (num_physpages < SBI_DEFAULT_RA_MAX / 4) + sbi->ll_ra_info.ra_max_pages = num_physpages / 4; else - RETURN(fl); + sbi->ll_ra_info.ra_max_pages = SBI_DEFAULT_RA_MAX; + INIT_LIST_HEAD(&sbi->ll_conn_chain); + INIT_HLIST_HEAD(&sbi->ll_orphan_dentry_list); + INIT_LIST_HEAD(&sbi->ll_mnt_list); + + sema_init(&sbi->ll_gns_sem, 1); + spin_lock_init(&sbi->ll_gns_lock); + INIT_LIST_HEAD(&sbi->ll_gns_sbi_head); + init_waitqueue_head(&sbi->ll_gns_waitq); + init_completion(&sbi->ll_gns_mount_finished); + + /* this later may be reset via /proc/fs/... */ + memcpy(sbi->ll_gns_oname, ".mntinfo", strlen(".mntinfo")); + sbi->ll_gns_oname[strlen(sbi->ll_gns_oname)] = '\0'; + + /* this later may be reset via /proc/fs/... */ + memcpy(sbi->ll_gns_upcall, "/usr/sbin/gns_upcall", + strlen("/usr/sbin/gns_upcall")); + sbi->ll_gns_upcall[strlen(sbi->ll_gns_upcall)] = '\0'; + + /* default values, may be changed via /proc/fs/... */ + sbi->ll_gns_state = LL_GNS_IDLE; + sbi->ll_gns_pending_dentry = NULL; + atomic_set(&sbi->ll_gns_enabled, 1); + sbi->ll_gns_tick = GNS_TICK_TIMEOUT; + sbi->ll_gns_timeout = GNS_MOUNT_TIMEOUT; + + sbi->ll_gns_timer.data = (unsigned long)sbi; + sbi->ll_gns_timer.function = ll_gns_timer_callback; + init_timer(&sbi->ll_gns_timer); + //audit mask + sbi->ll_audit_mask = AUDIT_OFF; + ll_set_sbi(sb, sbi); + + generate_random_uuid(uuid); + class_uuid_unparse(uuid, &sbi->ll_sb_uuid); + RETURN(sbi); } -void ll_options(char *options, char **ost, char **mds, int *flags) +void lustre_free_sbi(struct super_block *sb) { - char *this_char; -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) - char *opt_ptr = options; -#endif + struct ll_sb_info *sbi = ll_s2sbi(sb); ENTRY; - if (!options) { - EXIT; - return; - } - -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - for (this_char = strtok (options, ","); - this_char != NULL; - this_char = strtok (NULL, ",")) { -#else - while ((this_char = strsep (&opt_ptr, ",")) != NULL) { -#endif - CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ((!*ost && (*ost = ll_read_opt("osc", this_char)))|| - (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| - (!(*flags & LL_SBI_NOLCK) && - ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, LL_SBI_NOLCK)))) - continue; + if (sbi != NULL) { + list_del(&sbi->ll_gns_sbi_head); + del_timer(&sbi->ll_gns_timer); + OBD_FREE(sbi, sizeof(*sbi)); } + ll_set_sbi(sb, NULL); EXIT; } -void ll_lli_init(struct ll_inode_info *lli) +int lustre_init_dt_desc(struct ll_sb_info *sbi) { - sema_init(&lli->lli_open_sem, 1); - spin_lock_init(&lli->lli_read_extent_lock); - INIT_LIST_HEAD(&lli->lli_read_extents); - lli->lli_flags = 0; - lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - ll_lldo_init(&lli->lli_dirty); - spin_lock_init(&lli->lli_pg_lock); - INIT_LIST_HEAD(&lli->lli_lc_item); - plist_init(&lli->lli_pl_read); - plist_init(&lli->lli_pl_write); - atomic_set(&lli->lli_in_writepages, 0); -#endif + __u32 valsize; + int rc = 0; + ENTRY; + + valsize = sizeof(sbi->ll_dt_desc); + memset(&sbi->ll_dt_desc, 0, sizeof(sbi->ll_dt_desc)); + rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, + "lovdesc", &valsize, &sbi->ll_dt_desc); + RETURN(rc); } -int ll_fill_super(struct super_block *sb, void *data, int silent) +static int lustre_connect_mds(struct super_block *sb, char *lmv, + struct obd_connect_data *data, + char *mds_security, int async, int pag) { - struct inode *root = 0; - struct obd_device *obd; - struct ll_sb_info *sbi; - char *osc = NULL; - char *mdc = NULL; - int err; - struct ll_fid rootfid; + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct lustre_handle md_conn = {0, }; + struct obd_device *md_obd; struct obd_statfs osfs; - struct ptlrpc_request *request = NULL; - struct ptlrpc_connection *mdc_conn; - struct lustre_md md; - class_uuid_t uuid; - - ENTRY; + unsigned long sec_flags; + __u32 valsize; + int err = 0; + ENTRY; + + md_obd = class_name2obd(lmv); + if (!md_obd) { + CERROR("MDC %s: not setup or attached\n", lmv); + RETURN(-EINVAL); + } - CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - OBD_ALLOC(sbi, sizeof(*sbi)); - if (!sbi) - RETURN(-ENOMEM); + obd_set_info(md_obd->obd_self_export, strlen("async"), "async", + sizeof(async), &async); - INIT_LIST_HEAD(&sbi->ll_conn_chain); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list); - sb->u.generic_sbp = sbi; -#else - INIT_HLIST_HEAD(&sbi->ll_orphan_dentry_list); - spin_lock_init(&sbi->ll_iostats.fis_lock); - ll_s2sbi(sb) = sbi; -#endif - generate_random_uuid(uuid); - class_uuid_unparse(uuid, &sbi->ll_sb_uuid); + if (mds_security == NULL) + mds_security = "null"; + + err = obd_set_info(md_obd->obd_self_export, strlen("sec"), "sec", + strlen(mds_security), mds_security); + + if (err) { + CERROR("LMV %s: failed to set security %s, err %d\n", + lmv, mds_security, err); + RETURN(err); + } - ll_options(data, &osc, &mdc, &sbi->ll_flags); + if (pag) { + sec_flags = PTLRPC_SEC_FL_PAG; + err = obd_set_info(md_obd->obd_self_export, + strlen("sec_flags"), "sec_flags", + sizeof(sec_flags), &sec_flags); + if (err) { + OBD_FREE(data, sizeof(*data)); + RETURN(err); + } + } - if (!osc) { - CERROR("no osc\n"); - GOTO(out_free, err = -EINVAL); + err = obd_connect(&md_conn, md_obd, &sbi->ll_sb_uuid, data, + OBD_OPT_REAL_CLIENT); + if (err == -EBUSY) { + CERROR("An MDS (lmv %s) is performing recovery, of which this" + " client is not a part. Please wait for recovery to " + "complete, abort, or time out.\n", lmv); + GOTO(out, err); + } else if (err) { + CERROR("cannot connect to %s: rc = %d\n", lmv, err); + GOTO(out, err); } - if (!mdc) { - CERROR("no mdc\n"); - GOTO(out_free, err = -EINVAL); + sbi->ll_md_exp = class_conn2export(&md_conn); + + err = obd_statfs(md_obd, &osfs, jiffies - HZ); + if (err) + GOTO(out_disconnect, err); + + if (!osfs.os_bsize) { + CERROR("Invalid block size is detected."); + GOTO(out_disconnect, err); } - obd = class_name2obd(mdc); - if (!obd) { - CERROR("MDC %s: not setup or attached\n", mdc); - GOTO(out_free, err = -EINVAL); + sb->s_magic = LL_SUPER_MAGIC; + sb->s_blocksize = osfs.os_bsize; + sb->s_blocksize_bits = log2(osfs.os_bsize); + sb->s_maxbytes = PAGE_CACHE_MAXBYTES; + + /* in 2.6.x FS is not allowed to form s_dev */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + { + kdev_t devno; + + devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, + strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid)); + + sb->s_dev = devno; } +#endif - err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid); + /* after statfs, we are supposed to have connected to MDSs, + * so it's ok to check remote flag returned. + */ + valsize = sizeof(&sbi->ll_remote); + err = obd_get_info(sbi->ll_md_exp, strlen("remote_flag"), "remote_flag", + &valsize, &sbi->ll_remote); if (err) { - CERROR("cannot connect to %s: rc = %d\n", mdc, err); - GOTO(out_free, err); + CERROR("fail to obtain remote flag\n"); + GOTO(out_disconnect, err); } - err = obd_statfs(obd, &osfs, jiffies - HZ); +out_disconnect: if (err) - GOTO(out_mdc, err); - - LASSERT(osfs.os_bsize); - sb->s_blocksize = osfs.os_bsize; - sb->s_blocksize_bits = log2(osfs.os_bsize); - sb->s_magic = LL_SUPER_MAGIC; - sb->s_maxbytes = PAGE_CACHE_MAXBYTES; + obd_disconnect(sbi->ll_md_exp, 0); +out: + RETURN(err); +} - mdc_conn = sbi2mdc(sbi)->cl_import->imp_connection; +static int lustre_connect_ost(struct super_block *sb, char *lov, + struct obd_connect_data *data, + char *oss_security, int async, int pag) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct lustre_handle dt_conn = {0, }; + struct obd_device *obd = NULL; + unsigned long sec_flags; + int err, mdsize; - obd = class_name2obd(osc); + obd = class_name2obd(lov); if (!obd) { - CERROR("OSC %s: not setup or attached\n", osc); - GOTO(out_mdc, err); + CERROR("OSC %s: not setup or attached\n", lov); + GOTO(out, err = -EINVAL); } + obd_set_info(obd->obd_self_export, strlen("async"), "async", + sizeof(async), &async); + + if (oss_security == NULL) + oss_security = "null"; - err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid); + err = obd_set_info(obd->obd_self_export, strlen("sec"), "sec", + strlen(oss_security), oss_security); if (err) { - CERROR("cannot connect to %s: rc = %d\n", osc, err); - GOTO(out_mdc, err); + CERROR("LOV %s: failed to set security %s, err %d\n", + lov, oss_security, err); + RETURN(err); } - err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + /* FIXME Because of the async nature of file i/o, we never know + * who is actually dirty the pages; and any process have chance + * to trigger dirty-flushing within its own process context. So + * for simplicity we simply use root's credential, we suppose root + * always have credential. + */ + if (pag) + sec_flags = PTLRPC_SEC_FL_PAG; + else + sec_flags = PTLRPC_SEC_FL_OSS; + + err = obd_set_info(obd->obd_self_export, + strlen("sec_flags"), "sec_flags", + sizeof(sec_flags), &sec_flags); if (err) { - CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_osc, err); + OBD_FREE(data, sizeof(*data)); + RETURN(err); + } + + err = obd_connect(&dt_conn, obd, &sbi->ll_sb_uuid, data, 0); + if (err == -EBUSY) { + CERROR("An OST (lov %s) is performing recovery, of which this" + " client is not a part. Please wait for recovery to " + "complete, abort, or time out.\n", lov); + GOTO(out, err); + } else if (err) { + CERROR("cannot connect to %s: rc = %d\n", lov, err); + GOTO(out, err); + } + sbi->ll_dt_exp = class_conn2export(&dt_conn); + + err = lustre_init_dt_desc(sbi); + + if (err) { + CWARN("init dt_desc error %d \n", err); + GOTO(out, err = 0); } - CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id); - sbi->ll_rootino = rootfid.id; + mdsize = obd_size_diskmd(sbi->ll_dt_exp, NULL); + obd_init_ea_size(sbi->ll_md_exp, mdsize, sbi->ll_dt_desc.ld_tgt_count * + sizeof(struct llog_cookie)); +out: + RETURN(err); +} - sb->s_op = &ll_super_operations; +extern struct dentry_operations ll_d_ops; + +static int lustre_init_root_inode(struct super_block *sb) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct ptlrpc_request *request = NULL; + struct inode *root = NULL; + struct lustre_md md; + int err = 0; + ENTRY; - /* make root inode - * XXX: move this to after cbd setup? */ - err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid, - OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); + err = md_getstatus(sbi->ll_md_exp, &sbi->ll_rootid); if (err) { - CERROR("mdc_getattr failed for root: rc = %d\n", err); - GOTO(out_osc, err); + CERROR("cannot mds_connect: rc = %d\n", err); + GOTO(out, err); } + CDEBUG(D_SUPER, "rootid "DLID4"\n", OLID4(&sbi->ll_rootid)); - /* initialize committed transaction callback daemon */ - spin_lock_init(&sbi->ll_commitcbd_lock); - init_waitqueue_head(&sbi->ll_commitcbd_waitq); - init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq); - sbi->ll_commitcbd_flags = 0; - err = ll_commitcbd_setup(sbi); + sb->s_op = &lustre_super_operations; + + /* make root inode */ + err = md_getattr(sbi->ll_md_exp, &sbi->ll_rootid, + (OBD_MD_FLNOTOBD | OBD_MD_FLBLOCKS | OBD_MD_FID), + NULL, NULL, 0, 0, NULL, &request); if (err) { - CERROR("failed to start commit callback daemon: rc = %d\n",err); - ptlrpc_req_finished (request); - GOTO(out_lliod, err); + CERROR("md_getattr failed for root: rc = %d\n", err); + GOTO(out, err); } - err = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + err = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, + sbi->ll_dt_exp, &md); if (err) { - CERROR("failed to understand root inode md: rc = %d\n",err); - ptlrpc_req_finished (request); - GOTO(out_lliod, err); + CERROR("failed to understand root inode md: rc = %d\n", err); + ptlrpc_req_finished(request); + GOTO(out, err); } - LASSERT(sbi->ll_rootino != 0); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - root = iget4(sb, sbi->ll_rootino, NULL, &md); -#else - root = ll_iget(sb, sbi->ll_rootino, &md); -#endif + LASSERT(id_ino(&sbi->ll_rootid) != 0); + root = ll_iget(sb, id_ino(&sbi->ll_rootid), &md); ptlrpc_req_finished(request); if (root == NULL || is_bad_inode(root)) { - /* XXX might need iput() for bad inode */ + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); + if (md.mea != NULL) + obd_free_memmd(sbi->ll_md_exp, + (struct lov_stripe_md**)&md.mea); CERROR("lustre_lite: bad iget4 for root\n"); - GOTO(out_cbd, err = -EBADF); + GOTO(out_root, err = -EBADF); } + sb->s_root = d_alloc_root(root); + sb->s_root->d_op = &ll_d_ops; +out_root: + if (err) + iput(root); +out: + RETURN(err); +} -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - /* initialize the pagecache writeback thread */ - err = lliod_start(sbi, root); - if (err) { - CERROR("failed to start lliod: rc = %d\n",err); - GOTO(out_root, sb = NULL); +int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov, + char *gkc, int async, char *mds_security, + char *oss_security, __u32 *nllu, int pag, + __u64 *remote) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct obd_connect_data *data; + int err; + ENTRY; + + /*process the connect flags*/ + if ((*remote & (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) == + (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) { + CERROR("wrong remote flag "LPX64"\n", *remote); + RETURN(-EINVAL); } -#endif - sb->s_root = d_alloc_root(root); + + OBD_ALLOC(data, sizeof(*data)); + if (!data) + RETURN(-ENOMEM); + + data->ocd_connect_flags |= *remote & (OBD_CONNECT_LOCAL | + OBD_CONNECT_REMOTE); + memcpy(data->ocd_nllu, nllu, sizeof(data->ocd_nllu)); if (proc_lustre_fs_root) { - err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb, - osc, mdc); + err = lprocfs_register_mountpoint(proc_lustre_fs_root, + sb, lov, lmv); if (err < 0) CERROR("could not register mount in /proc/lustre"); } -out_dev: - if (mdc) - OBD_FREE(mdc, strlen(mdc) + 1); - if (osc) - OBD_FREE(osc, strlen(osc) + 1); + /*connect mds */ + err = lustre_connect_mds(sb, lmv, data, mds_security, async, pag); + if (err) + GOTO(out, err); - RETURN(err); + /*connect OST*/ + err = lustre_connect_ost(sb, lov, data, oss_security, async, pag); + if (err) + GOTO(out_lmv, err); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) -out_root: - iput(root); + err = lustre_init_crypto(sb, gkc, data, async); + if (err) { + CERROR("Could not connect to GSS err %d\n", err); + err = 0; + } + /*connect GSS*/ + err = lustre_init_root_inode(sb); + if (err) + GOTO(out_gks, err); + + err = ll_close_thread_start(&sbi->ll_lcq); + if (err) { + CERROR("cannot start close thread: rc %d\n", err); + GOTO(out_root, err); + } + + ll_gns_add_timer(sbi); + + /* making vm readahead 0 for 2.4.x. In the case of 2.6.x, + backing dev info assigned to inode mapping is used for + determining maximal readahead. */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)) && \ + !defined(KERNEL_HAS_AS_MAX_READAHEAD) + /* bug 2805 - set VM readahead to zero */ + vm_max_readahead = vm_min_readahead = 0; #endif -out_cbd: - ll_commitcbd_cleanup(sbi); -out_lliod: -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); + sb->s_flags |= MS_POSIXACL; +#ifdef S_PDIROPS + CWARN("Enabling PDIROPS\n"); + sb->s_flags |= S_PDIROPS; #endif -out_osc: - obd_disconnect(&sbi->ll_osc_conn, 0); -out_mdc: - obd_disconnect(&sbi->ll_mdc_conn, 0); -out_free: + if (data != NULL) + OBD_FREE(data, sizeof(*data)); + RETURN(err); +out_root: + if (sb->s_root) + dput(sb->s_root); +out_gks: + lustre_destroy_crypto(sb); +out_lmv: + obd_disconnect(sbi->ll_md_exp, 0); +out: + if (data != NULL) + OBD_FREE(data, sizeof(*data)); lprocfs_unregister_mountpoint(sbi); - OBD_FREE(sbi, sizeof(*sbi)); - - goto out_dev; -} /* ll_read_super */ + RETURN(err); +} -void ll_put_super(struct super_block *sb) +void lustre_common_put_super(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn); - struct list_head *tmp, *next; -#else struct hlist_node *tmp, *next; -#endif - struct ll_fid rootfid; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - list_del(&sbi->ll_conn_chain); - ll_commitcbd_cleanup(sbi); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); -#endif - obd_disconnect(&sbi->ll_osc_conn, 0); + ll_gns_del_timer(sbi); + ll_close_thread_stop(sbi->ll_lcq); - /* NULL request to force sync on the MDS, and get the last_committed - * value to flush remaining RPCs from the sending queue on client. - * - * XXX This should be an mdc_sync() call to sync the whole MDS fs, - * which we can call for other reasons as well. - */ -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - if (!obd->obd_no_recov) -#endif - mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + lustre_destroy_crypto(sb); + + list_del(&sbi->ll_conn_chain); + obd_disconnect(sbi->ll_dt_exp, 0); lprocfs_unregister_mountpoint(sbi); if (sbi->ll_proc_root) { @@ -345,198 +477,744 @@ void ll_put_super(struct super_block *sb) sbi->ll_proc_root = NULL; } - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_md_exp, 0); - spin_lock(&dcache_lock); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - list_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { - struct dentry *dentry = list_entry(tmp, struct dentry, d_hash); - shrink_dcache_parent(dentry); - } -#else + // We do this to get rid of orphaned dentries. That is not really trw. hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash); + CWARN("orphan dentry %.*s (%p->%p) at unmount\n", + dentry->d_name.len, dentry->d_name.name, dentry, next); shrink_dcache_parent(dentry); } -#endif - spin_unlock(&dcache_lock); - - OBD_FREE(sbi, sizeof(*sbi)); - EXIT; -} /* ll_put_super */ +} -void ll_clear_inode(struct inode *inode) +char *ll_read_opt(const char *opt, char *data) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ll_inode_info *lli = ll_i2info(inode); - int rc; + char *value; + char *retval; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, - inode->i_generation, inode); - rc = ll_mdc_cancel_unused(&sbi->ll_mdc_conn, inode, - LDLM_FL_WARN | LDLM_FL_NO_CALLBACK, inode); - if (rc < 0) { - CERROR("ll_mdc_cancel_unused: %d\n", rc); - /* XXX FIXME do something dramatic */ - } - - if (atomic_read(&inode->i_count) != 0) - CERROR("clearing in-use inode %lu: count = %d\n", - inode->i_ino, atomic_read(&inode->i_count)); - - if (lli->lli_smd) { - rc = obd_cancel_unused(&sbi->ll_osc_conn, lli->lli_smd, - LDLM_FL_WARN, inode); - if (rc < 0) { - CERROR("obd_cancel_unused: %d\n", rc); - /* XXX FIXME do something dramatic */ - } - obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd); - lli->lli_smd = NULL; - } + CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); + if (strncmp(opt, data, strlen(opt))) + RETURN(NULL); + if ((value = strchr(data, '=')) == NULL) + RETURN(NULL); - if (lli->lli_symlink_name) { - OBD_FREE(lli->lli_symlink_name, - strlen(lli->lli_symlink_name) + 1); - lli->lli_symlink_name = NULL; + value++; + OBD_ALLOC(retval, strlen(value) + 1); + if (!retval) { + CERROR("out of memory!\n"); + RETURN(NULL); } - EXIT; + memcpy(retval, value, strlen(value)+1); + CDEBUG(D_SUPER, "Assigned option: %s, value %s\n", opt, retval); + RETURN(retval); } -#if 0 -static void ll_delete_inode(struct inode *inode) +int ll_set_opt(const char *opt, char *data, int fl) { ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu(%p)\n", inode->i_ino, inode); - if (S_ISREG(inode->i_mode)) { - int err; - struct obdo *oa; - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - - /* mcreate with no open */ - if (!lsm) - GOTO(out, 0); - if (lsm->lsm_object_id == 0) { - CERROR("This really happens\n"); - /* No obdo was ever created */ - GOTO(out, 0); - } + CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data); + if (strncmp(opt, data, strlen(opt))) + RETURN(0); + else + RETURN(fl); +} - oa = obdo_alloc(); - if (oa == NULL) - GOTO(out, -ENOMEM); +void ll_options(char *options, char **lov, char **lmv, char **gkc, + char **mds_sec, char **oss_sec, int *async, int *flags) +{ + char *this_char; +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) + char *opt_ptr = options; +#endif + ENTRY; - oa->o_id = lsm->lsm_object_id; - oa->o_valid = OBD_MD_FLID; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE); + if (!options) { + EXIT; + return; + } - err = obd_destroy(ll_i2obdconn(inode), oa, lsm, NULL); - obdo_free(oa); - if (err) - CDEBUG(D_INODE, - "inode %lu obd_destroy objid "LPX64" error %d\n", - inode->i_ino, lsm->lsm_object_id, err); + *async = 0; +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + for (this_char = strtok (options, ","); + this_char != NULL; + this_char = strtok (NULL, ",")) { +#else + while ((this_char = strsep (&opt_ptr, ",")) != NULL) { +#endif + CDEBUG(D_SUPER, "this_char %s\n", this_char); + if (!*lov && (*lov = ll_read_opt("osc", this_char))) + continue; + if (!*lmv && (*lmv = ll_read_opt("mdc", this_char))) + continue; + if (!*gkc && (*gkc = ll_read_opt("gkc", this_char))) + continue; + if (!strncmp(this_char, "lasync", strlen("lasync"))) { + *async = 1; + continue; + } + if (!*mds_sec && (*mds_sec = ll_read_opt("mds_sec", this_char))) + continue; + if (!*oss_sec && (*oss_sec = ll_read_opt("oss_sec", this_char))) + continue; + if (!(*flags & LL_SBI_NOLCK) && + ((*flags) = (*flags) | + ll_set_opt("nolock", this_char, + LL_SBI_NOLCK))) + continue; } + + EXIT; +} + +void ll_lli_init(struct ll_inode_info *lli) +{ + sema_init(&lli->lli_open_sem, 1); + sema_init(&lli->lli_size_sem, 1); + lli->lli_flags = 0; + lli->lli_size_pid = 0; + lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; + spin_lock_init(&lli->lli_lock); + INIT_LIST_HEAD(&lli->lli_pending_write_llaps); + INIT_LIST_HEAD(&lli->lli_close_item); + lli->lli_inode_magic = LLI_INODE_MAGIC; + memset(&lli->lli_id, 0, sizeof(lli->lli_id)); + sema_init(&lli->lli_och_sem, 1); + lli->lli_mds_read_och = lli->lli_mds_write_och = NULL; + lli->lli_mds_exec_och = NULL; + lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0; + lli->lli_open_fd_exec_count = 0; + lli->lli_audit_mask = AUDIT_OFF; + lli->lli_key_info = NULL; + init_waitqueue_head(&lli->lli_dirty_wait); + lli->lli_io_epoch = 0; + INIT_LIST_HEAD(&lli->lli_capas); +} + +int ll_fill_super(struct super_block *sb, void *data, int silent) +{ + struct ll_sb_info *sbi; + char *lov = NULL, *lmv = NULL, *gkc = NULL; + char *mds_sec = NULL; + char *oss_sec = NULL; + int async, err; + __u32 nllu[2] = { NOBODY_UID, NOBODY_GID }; + __u64 remote_flag = 0; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); + + sbi = lustre_init_sbi(sb); + if (!sbi) + RETURN(-ENOMEM); + + sbi->ll_flags |= LL_SBI_READAHEAD; + ll_options(data, &lov, &lmv, &gkc, &mds_sec, &oss_sec, + &async, &sbi->ll_flags); + + if (!lov || !lmv) { + CERROR("no osc %p or no mdc %p\n", lov, lmv); + GOTO(out, err = -EINVAL); + } + + err = lustre_common_fill_super(sb, lmv, lov, gkc, async, mds_sec, + oss_sec, nllu, 0, &remote_flag); + EXIT; out: - clear_inode(inode); + if (err) + lustre_free_sbi(sb); + + if (lmv) + OBD_FREE(lmv, strlen(lmv) + 1); + if (lov) + OBD_FREE(lov, strlen(lov) + 1); + if (mds_sec) + OBD_FREE(mds_sec, strlen(mds_sec) + 1); + if (oss_sec) + OBD_FREE(oss_sec, strlen(oss_sec) + 1); + if (gkc) + OBD_FREE(gkc, strlen(gkc) + 1); + + return err; +} /* ll_read_super */ + +static int lustre_process_log(struct lustre_mount_data *lmd, char *profile, + struct config_llog_instance *cfg, int allow_recov) +{ + struct lustre_cfg *lcfg = NULL; + struct lustre_cfg_bufs bufs; + struct portals_cfg pcfg; + char *peer = "MDS_PEER_UUID"; + struct obd_device *obd; + struct lustre_handle md_conn = {0, }; + struct obd_export *exp; + char *name = "mdc_dev"; + class_uuid_t uuid; + struct obd_uuid lmv_uuid; + struct llog_ctxt *ctxt; + int rc = 0, err = 0; + ENTRY; + + if (lmd_bad_magic(lmd)) + RETURN(-EINVAL); + + generate_random_uuid(uuid); + class_uuid_unparse(uuid, &lmv_uuid); + + if (lmd->lmd_local_nid) { + PCFG_INIT(pcfg, NAL_CMD_REGISTER_MYNID); + pcfg.pcfg_nal = lmd->lmd_nal; + pcfg.pcfg_nid = lmd->lmd_local_nid; + rc = libcfs_nal_cmd(&pcfg); + if (rc < 0) + GOTO(out, rc); + } + + if (lmd->lmd_nal == SOCKNAL || + lmd->lmd_nal == OPENIBNAL || + lmd->lmd_nal == IIBNAL || + lmd->lmd_nal == VIBNAL || + lmd->lmd_nal == RANAL) { + PCFG_INIT(pcfg, NAL_CMD_ADD_PEER); + pcfg.pcfg_nal = lmd->lmd_nal; + pcfg.pcfg_nid = lmd->lmd_server_nid; + pcfg.pcfg_id = lmd->lmd_server_ipaddr; + pcfg.pcfg_misc = lmd->lmd_port; + rc = libcfs_nal_cmd(&pcfg); + if (rc < 0) + GOTO(out, rc); + } + lustre_cfg_bufs_reset(&bufs, name); + lustre_cfg_bufs_set_string(&bufs, 1, peer); + + lcfg = lustre_cfg_new(LCFG_ADD_UUID, &bufs); + lcfg->lcfg_nal = lmd->lmd_nal; + lcfg->lcfg_nid = lmd->lmd_server_nid; + LASSERT(lcfg->lcfg_nal); + LASSERT(lcfg->lcfg_nid); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err < 0) + GOTO(out_del_conn, err); + + lustre_cfg_bufs_reset(&bufs, name); + lustre_cfg_bufs_set_string(&bufs, 1, OBD_MDC_DEVICENAME); + lustre_cfg_bufs_set_string(&bufs, 2, (char *)lmv_uuid.uuid); + + lcfg = lustre_cfg_new(LCFG_ATTACH, &bufs); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err < 0) + GOTO(out_del_uuid, err); + + lustre_cfg_bufs_reset(&bufs, name); + lustre_cfg_bufs_set_string(&bufs, 1, lmd->lmd_mds); + lustre_cfg_bufs_set_string(&bufs, 2, peer); + + lcfg = lustre_cfg_new(LCFG_SETUP, &bufs); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err < 0) + GOTO(out_detach, err); + + obd = class_name2obd(name); + if (obd == NULL) + GOTO(out_cleanup, rc = -EINVAL); + + rc = obd_set_info(obd->obd_self_export, strlen("sec"), "sec", + strlen(lmd->lmd_mds_security), lmd->lmd_mds_security); + if (rc) + GOTO(out_cleanup, rc); + + if (lmd->lmd_pag) { + unsigned long sec_flags = PTLRPC_SEC_FL_PAG; + rc = obd_set_info(obd->obd_self_export, + strlen("sec_flags"), "sec_flags", + sizeof(sec_flags), &sec_flags); + if (rc) + GOTO(out_cleanup, rc); + } + + /* Disable initial recovery on this import */ + rc = obd_set_info(obd->obd_self_export, + strlen("initial_recov"), "initial_recov", + sizeof(allow_recov), &allow_recov); + if (rc) + GOTO(out_cleanup, rc); + + rc = obd_connect(&md_conn, obd, &lmv_uuid, NULL, OBD_OPT_REAL_CLIENT); + if (rc) { + CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, rc); + GOTO(out_cleanup, rc); + } + + exp = class_conn2export(&md_conn); + + ctxt = llog_get_context(&exp->exp_obd->obd_llogs,LLOG_CONFIG_REPL_CTXT); + rc = class_config_process_llog(ctxt, profile, cfg); + if (rc) + CERROR("class_config_process_llog failed: rc = %d\n", rc); + + err = obd_disconnect(exp, 0); + EXIT; +out_cleanup: + lustre_cfg_bufs_reset(&bufs, name); + lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err < 0) + GOTO(out, err); +out_detach: + lustre_cfg_bufs_reset(&bufs, name); + lcfg = lustre_cfg_new(LCFG_DETACH, &bufs); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err < 0) + GOTO(out, err); + +out_del_uuid: + lustre_cfg_bufs_reset(&bufs, name); + lustre_cfg_bufs_set_string(&bufs, 1, peer); + lcfg = lustre_cfg_new(LCFG_DEL_UUID, &bufs); + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + +out_del_conn: + if (lmd->lmd_nal == SOCKNAL || + lmd->lmd_nal == OPENIBNAL || + lmd->lmd_nal == IIBNAL || + lmd->lmd_nal == VIBNAL || + lmd->lmd_nal == RANAL) { + int err2; + + PCFG_INIT(pcfg, NAL_CMD_DEL_PEER); + pcfg.pcfg_nal = lmd->lmd_nal; + pcfg.pcfg_nid = lmd->lmd_server_nid; + pcfg.pcfg_flags = 1; /* single_share */ + err2 = libcfs_nal_cmd(&pcfg); + if (err2 && !err) + err = err2; + if (err < 0) + GOTO(out, err); + } +out: + if (rc == 0) + rc = err; + + return rc; } -#endif -/* like inode_setattr, but doesn't mark the inode dirty */ -int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc) +static void lustre_manual_cleanup(struct ll_sb_info *sbi) { - unsigned int ia_valid = attr->ia_valid; - int error = 0; + struct lustre_cfg *lcfg; + struct lustre_cfg_bufs bufs; + struct obd_device *obd; + int next = 0; - if ((ia_valid & ATTR_SIZE) && trunc) { - if (attr->ia_size > ll_file_maxbytes(inode)) { - error = -EFBIG; - goto out; + while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL) { + int err; + + lustre_cfg_bufs_reset(&bufs, obd->obd_name); + lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs); + err = class_process_config(lcfg); + if (err) { + CERROR("cleanup failed: %s\n", obd->obd_name); + //continue; + } + + lcfg->lcfg_command = LCFG_DETACH; + err = class_process_config(lcfg); + lustre_cfg_free(lcfg); + if (err) { + CERROR("detach failed: %s\n", obd->obd_name); + //continue; } - error = vmtruncate(inode, attr->ia_size); - if (error) - goto out; - } else if (ia_valid & ATTR_SIZE) - inode->i_size = attr->ia_size; - - if (ia_valid & ATTR_UID) - inode->i_uid = attr->ia_uid; - if (ia_valid & ATTR_GID) - inode->i_gid = attr->ia_gid; - if (ia_valid & ATTR_ATIME) - inode->i_atime = attr->ia_atime; - if (ia_valid & ATTR_MTIME) - inode->i_mtime = attr->ia_mtime; - if (ia_valid & ATTR_CTIME) - inode->i_ctime = attr->ia_ctime; - if (ia_valid & ATTR_MODE) { - inode->i_mode = attr->ia_mode; - if (!in_group_p(inode->i_gid) && !capable(CAP_FSETID)) - inode->i_mode &= ~S_ISGID; } -out: - return error; + + if (sbi->ll_lmd != NULL) + class_del_profile(sbi->ll_lmd->lmd_profile); } -int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) +static int lustre_process_profile(struct super_block *sb, + struct lustre_mount_data *lmd, + char **lov, char **lmv, char **gkc) { - struct ptlrpc_request *request = NULL; - struct ll_sb_info *sbi = ll_i2sbi(inode); - int err = 0; + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct config_llog_instance cfg; + struct lustre_profile *lprof; + int len, err = 0; ENTRY; - /* change incore inode */ - err = ll_attr2inode(inode, attr, do_trunc); - if (err) + if (!lmd->lmd_profile) + RETURN(0); + + if (lmd->lmd_mds[0] == '\0') { + CERROR("no mds name\n"); + GOTO(out, err = -EINVAL); + } + lmd->lmd_mds_security[sizeof(lmd->lmd_mds_security) - 1] = 0; + lmd->lmd_oss_security[sizeof(lmd->lmd_oss_security) - 1] = 0; + + OBD_ALLOC(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); + if (sbi->ll_lmd == NULL) + GOTO(out, err = -ENOMEM); + memcpy(sbi->ll_lmd, lmd, sizeof(*lmd)); + + /* generate a string unique to this super, let's try the address of the + * super itself. */ + len = (sizeof(sb) * 2) + 1; + OBD_ALLOC(sbi->ll_instance, len); + if (sbi->ll_instance == NULL) + GOTO(out, err = -ENOMEM); + sprintf(sbi->ll_instance, "%p", sb); + + cfg.cfg_instance = sbi->ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + cfg.cfg_local_nid = lmd->lmd_local_nid; + err = lustre_process_log(lmd, lmd->lmd_profile, &cfg, 0); + if (err < 0) { + CERROR("Unable to process log: %s\n", lmd->lmd_profile); + GOTO(out, err); + } + + lprof = class_get_profile(lmd->lmd_profile); + if (lprof == NULL) { + CERROR("No profile found: %s\n", lmd->lmd_profile); + GOTO(out, err = -EINVAL); + } + + OBD_ALLOC(*lov, strlen(lprof->lp_lov) + + strlen(sbi->ll_instance) + 2); + if (*lov == NULL) + GOTO(out, err = -ENOMEM); + + sprintf(*lov, "%s-%s", lprof->lp_lov, sbi->ll_instance); + + OBD_ALLOC(*lmv, strlen(lprof->lp_lmv) + + strlen(sbi->ll_instance) + 2); + if (*lmv == NULL) + GOTO(out_free_lov, err = -ENOMEM); + + sprintf(*lmv, "%s-%s", lprof->lp_lmv, sbi->ll_instance); + + if (lprof->lp_gkc) { + OBD_ALLOC(*gkc, strlen(lprof->lp_gkc) + + strlen(sbi->ll_instance) + 2); + if (*gkc == NULL) + GOTO(out_free_lmv, err = -ENOMEM); + + sprintf(*gkc, "%s-%s", lprof->lp_gkc, sbi->ll_instance); + } + + RETURN(err); +out_free_lmv: + OBD_FREE(*lmv, strlen(lprof->lp_lmv) + + strlen(sbi->ll_instance) + 2); +out_free_lov: + OBD_FREE(*lov, strlen(lprof->lp_lov) + + strlen(sbi->ll_instance) + 2); +out: + return err; +} + +static int lustre_clean_profile(struct ll_sb_info *sbi, int force_umount) +{ + struct lustre_mount_data *lmd = sbi->ll_lmd; + struct config_llog_instance cfg; + char *cl_prof; + int len, err = 0; + ENTRY; + + if (!lmd) RETURN(err); - /* Don't send size changes to MDS to avoid "fast EA" problems, and - * also avoid a pointless RPC (we get file size from OST anyways). - */ - attr->ia_valid &= ~ATTR_SIZE; - if (attr->ia_valid) { - struct mdc_op_data op_data; + len = strlen(sbi->ll_lmd->lmd_profile) + sizeof("-clean") + 1; + + if (force_umount) { + CERROR("force umount, doing manual cleanup\n"); + lustre_manual_cleanup(sbi); + GOTO(free_lmd, 0); + + } + if (sbi->ll_instance != NULL) { + cfg.cfg_instance = sbi->ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + + OBD_ALLOC(cl_prof, len); + if (!cl_prof) { + CERROR("can't allocate memory, " + "skipping processing cleanup profile.\n"); + GOTO(free_lmd, err = -ENOMEM); + } + + sprintf(cl_prof, "%s-clean", lmd->lmd_profile); + err = lustre_process_log(lmd, cl_prof, &cfg, 0); + if (err < 0) { + CERROR("Unable to process log: %s\n", cl_prof); + lustre_manual_cleanup(sbi); + } + OBD_FREE(cl_prof, len); + } + EXIT; +free_lmd: + if (sbi->ll_instance) + OBD_FREE(sbi->ll_instance, strlen(sbi->ll_instance) + 1); + OBD_FREE(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); + return err; +} - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - err = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, NULL, 0, &request); - if (err) - CERROR("mdc_setattr fails: err = %d\n", err); +int lustre_fill_super(struct super_block *sb, void *data, int silent) +{ + struct lustre_mount_data * lmd = data; + char *lov = NULL, *lmv = NULL, *gkc = NULL; + struct ll_sb_info *sbi; + int err; + ENTRY; - ptlrpc_req_finished(request); - if (S_ISREG(inode->i_mode) && attr->ia_valid & ATTR_MTIME_SET) { - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - struct obdo oa; - int err2; + CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); + if (lmd_bad_magic(lmd)) + RETURN(-EINVAL); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - inode->i_ino, attr->ia_mtime); - oa.o_mtime = attr->ia_mtime; -#else - CDEBUG(D_INODE, "set mtime on OST inode %lu to " - LPU64"\n", inode->i_ino, - ll_ts2u64(&attr->ia_mtime)); - oa.o_mtime = ll_ts2u64(&attr->ia_mtime); -#endif - oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME; - err2 = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); - if (err2) { - CERROR("obd_setattr fails: rc=%d\n", err); - if (!err) - err = err2; - } + sbi = lustre_init_sbi(sb); + if (!sbi) + RETURN(-ENOMEM); + + sbi->ll_flags |= LL_SBI_READAHEAD; + + err = lustre_process_profile(sb, lmd, &lov, &lmv, &gkc); + if (err) { + CERROR("Can not process the profile err %d \n", err); + GOTO(out_free, err); + } + if (!lov || !lmv) { + CERROR("no osc %p or no mdc %p \n", lov, lmv); + GOTO(out_free, err = -EINVAL); + } + + err = lustre_common_fill_super(sb, lmv, lov, gkc, lmd->lmd_async, + lmd->lmd_mds_security, + lmd->lmd_oss_security, + &lmd->lmd_nllu, lmd->lmd_pag, + &lmd->lmd_remote_flag); + + if (err) + GOTO(out_free, err); + + EXIT; +out_dev: + if (lmv) + OBD_FREE(lmv, strlen(lmv) + 1); + if (lov) + OBD_FREE(lov, strlen(lov) + 1); + if (gkc) + OBD_FREE(gkc, strlen(gkc) + 1); + + return err; +out_free: + lustre_clean_profile(sbi, 0); + lustre_free_sbi(sb); + goto out_dev; + +} /* lustre_fill_super */ + +void lustre_put_super(struct super_block *sb) +{ + struct obd_device *obd; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int force_umount = 0; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); + obd = class_exp2obd(sbi->ll_md_exp); + if (obd) + force_umount = obd->obd_no_recov; + obd = NULL; + + lustre_common_put_super(sb); + lustre_clean_profile(sbi, force_umount); + lustre_free_sbi(sb); + + EXIT; +} /* lustre_put_super */ + +int ll_process_config_update(struct ll_sb_info *sbi, int clean) +{ + struct lustre_mount_data *lmd = sbi->ll_lmd; + char *profile = lmd->lmd_profile, *name = NULL; + struct config_llog_instance cfg; + int rc, namelen = 0, version; + struct llog_ctxt *ctxt; + ENTRY; + + if (profile == NULL) + RETURN(0); + if (lmd == NULL) { + CERROR("Client not mounted with zero-conf; cannot " + "process update log.\n"); + RETURN(0); + } + + cfg.cfg_instance = sbi->ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + cfg.cfg_local_nid = lmd->lmd_local_nid; + + namelen = strlen(profile) + 20; /* -clean-######### */ + OBD_ALLOC(name, namelen); + if (name == NULL) + RETURN(-ENOMEM); + + if (clean) { + version = sbi->ll_config_version - 1; + sprintf(name, "%s-clean-%d", profile, version); + } else { + version = sbi->ll_config_version + 1; + sprintf(name, "%s-%d", profile, version); + } + + CWARN("Applying configuration log %s\n", name); + + ctxt = llog_get_context(&sbi->ll_md_exp->exp_obd->obd_llogs, + LLOG_CONFIG_REPL_CTXT); + rc = class_config_process_llog(ctxt, name, &cfg); + if (rc == 0) + sbi->ll_config_version = version; + CWARN("Finished applying configuration log %s: %d\n", name, rc); + + if (rc == 0 && clean == 0) { + struct lov_desc desc; + __u32 valsize; + int rc = 0; + + valsize = sizeof(desc); + rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, + "lovdesc", &valsize, &desc); + + rc = obd_init_ea_size(sbi->ll_md_exp, + obd_size_diskmd(sbi->ll_dt_exp, NULL), + (desc.ld_tgt_count * + sizeof(struct llog_cookie))); + } + OBD_FREE(name, namelen); + RETURN(rc); +} + +struct inode *ll_inode_from_lock(struct ldlm_lock *lock) +{ + struct inode *inode = NULL; + + /* NOTE: we depend on atomic igrab() -bzzz */ + lock_res_and_lock(lock); + if (lock->l_ast_data) { + struct ll_inode_info *lli = ll_i2info(lock->l_ast_data); + if (lli->lli_inode_magic == LLI_INODE_MAGIC) { + inode = igrab(lock->l_ast_data); + } else { + struct timeval now; + do_gettimeofday(&now); + inode = lock->l_ast_data; + LDLM_ERROR(lock, "granted at %lu.%lu, now %lu.%lu", + lock->l_enqueued_time.tv_sec, + lock->l_enqueued_time.tv_usec, + now.tv_sec, now.tv_usec); + CDEBUG(inode->i_state & I_FREEING ? D_INFO : D_WARNING, + "l_ast_data %p is bogus: magic %0x8\n", + lock->l_ast_data, lli->lli_inode_magic); + CDEBUG(D_ERROR, "i_state = 0x%lx, l_ast_data %p is bogus: magic %0x8\n", + inode->i_state, lock->l_ast_data, lli->lli_inode_magic); + inode = NULL; + unlock_res_and_lock(lock); + LBUG(); } } + unlock_res_and_lock(lock); + return inode; +} - RETURN(err); +int null_if_equal(struct ldlm_lock *lock, void *data) +{ + if (data == lock->l_ast_data) { + lock->l_ast_data = NULL; + + if (lock->l_req_mode != lock->l_granted_mode) + LDLM_ERROR(lock,"clearing inode with ungranted lock\n"); + } + + return LDLM_ITER_CONTINUE; +} + +static void remote_acl_free(struct remote_acl *racl); + +void ll_clear_inode(struct inode *inode) +{ + struct lustre_id id; + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct obd_capa *ocapa, *tmp; + ENTRY; + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, + inode->i_generation, inode); + + LASSERT(ll_is_inode_dirty(inode) == 0); + ll_inode2id(&id, inode); + + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); + md_change_cbdata(sbi->ll_md_exp, &id, null_if_equal, inode); + + LASSERT(!lli->lli_open_fd_write_count); + LASSERT(!lli->lli_open_fd_read_count); + LASSERT(!lli->lli_open_fd_exec_count); + if (lli->lli_mds_write_och) + ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE); + if (lli->lli_mds_exec_och) + ll_md_real_close(sbi->ll_md_exp, inode, FMODE_EXEC); + if (lli->lli_mds_read_och) + ll_md_real_close(sbi->ll_md_exp, inode, FMODE_READ); + if (lli->lli_smd) + obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd, + null_if_equal, inode); + + if (lli->lli_smd) { + obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd); + lli->lli_smd = NULL; + } + + if (lli->lli_mea) { + obd_free_memmd(sbi->ll_md_exp, + (struct lov_stripe_md **) &lli->lli_mea); + lli->lli_mea = NULL; + } + ll_crypto_destroy_inode_key(inode); + if (lli->lli_symlink_name) { + OBD_FREE(lli->lli_symlink_name, + strlen(lli->lli_symlink_name) + 1); + lli->lli_symlink_name = NULL; + } + + if (lli->lli_posix_acl) { + LASSERT(lli->lli_remote_acl == NULL); + posix_acl_release(lli->lli_posix_acl); + lli->lli_posix_acl = NULL; + } + + if (lli->lli_remote_acl) { + LASSERT(lli->lli_posix_acl == NULL); + remote_acl_free(lli->lli_remote_acl); + lli->lli_remote_acl = NULL; + } + + list_for_each_entry_safe(ocapa, tmp, &lli->lli_capas, u.client.lli_list) + capa_put(ocapa); + + LASSERT(!mapping_has_pages(inode->i_mapping)); + + lli->lli_inode_magic = LLI_INODE_DEAD; + EXIT; } /* If this inode has objects allocated to it (lsm != NULL), then the OST @@ -552,24 +1230,19 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) * I don't believe it is possible to get e.g. ATTR_MTIME_SET and ATTR_SIZE * at the same time. */ -#define OST_ATTR (ATTR_MTIME | ATTR_MTIME_SET | ATTR_CTIME | \ - ATTR_ATIME | ATTR_ATIME_SET | ATTR_SIZE) int ll_setattr_raw(struct inode *inode, struct iattr *attr) { struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; - time_t now = LTIME_S(CURRENT_TIME); + struct mdc_op_data *op_data; int ia_valid = attr->ia_valid; - int rc = 0; + int err, rc = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", inode->i_ino); - -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETATTR); -#endif if (ia_valid & ATTR_SIZE) { if (attr->ia_size > ll_file_maxbytes(inode)) { @@ -581,247 +1254,737 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) attr->ia_valid |= ATTR_MTIME | ATTR_CTIME; } + /* POSIX: check before ATTR_*TIME_SET set (from inode_change_ok) */ + if (ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET)) { + if (current->fsuid != inode->i_uid && !capable(CAP_FOWNER)) + RETURN(-EPERM); + } + /* We mark all of the fields "set" so MDS/OST does not re-set them */ if (attr->ia_valid & ATTR_CTIME) { - attr->ia_ctime = now; + attr->ia_ctime = CURRENT_TIME; attr->ia_valid |= ATTR_CTIME_SET; } if (!(ia_valid & ATTR_ATIME_SET) && (attr->ia_valid & ATTR_ATIME)) { - attr->ia_atime = now; + attr->ia_atime = CURRENT_TIME; attr->ia_valid |= ATTR_ATIME_SET; } if (!(ia_valid & ATTR_MTIME_SET) && (attr->ia_valid & ATTR_MTIME)) { - attr->ia_mtime = now; + attr->ia_mtime = CURRENT_TIME; attr->ia_valid |= ATTR_MTIME_SET; } if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n", - attr->ia_mtime, attr->ia_ctime, now); - if (lsm) - attr->ia_valid &= ~ATTR_SIZE; + LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime), + LTIME_S(CURRENT_TIME)); /* If only OST attributes being set on objects, don't do MDS RPC. * In that case, we need to check permissions and update the local * inode ourselves so we can call obdo_from_inode() always. */ - if (ia_valid & (lsm ? ~(OST_ATTR | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { + if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN /*| ATTR_RAW*/) : ~0)) { struct lustre_md md; - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - - rc = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, NULL, 0, &request); - + void *key = NULL; + int key_size = 0; + + OBD_ALLOC(op_data, sizeof(*op_data)); + if (op_data == NULL) + RETURN(-ENOMEM); + ll_inode2mdc_data(op_data, inode, (OBD_MD_FLID | OBD_MD_MEA)); + + if (ia_valid & (ATTR_UID | ATTR_GID | ATTR_MODE)) { + rc = ll_crypto_get_mac(inode, attr, NULL, 0, &key, + &key_size); + if (rc) { + CERROR("can not get right mac, rc=%d\n", rc); + if (key && key_size) + OBD_FREE(key, key_size); + RETURN(rc); + } + } + rc = md_setattr(sbi->ll_md_exp, op_data, + attr, key, key_size, NULL, 0, NULL, + 0, &request); + OBD_FREE(op_data, sizeof(*op_data)); + + if (key && key_size) + OBD_FREE(key, key_size); if (rc) { ptlrpc_req_finished(request); if (rc != -EPERM && rc != -EACCES) - CERROR("mdc_setattr fails: rc = %d\n", rc); + CERROR("md_setattr fails: rc = %d\n", rc); RETURN(rc); } - - rc = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + rc = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, + sbi->ll_dt_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); } - ll_update_inode(inode, md.body, md.lsm); + + if (attr->ia_valid & ATTR_SIZE) { + rc = ll_set_trunc_capa(request, 0, inode); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + } + + /* We call inode_setattr to adjust timestamps, but we first + * clear ATTR_SIZE to avoid invoking vmtruncate. + * + * NB: ATTR_SIZE will only be set at this point if the size + * resides on the MDS, ie, this file has no objects. */ + attr->ia_valid &= ~ATTR_SIZE; + + /* + * assigning inode_setattr() to @err to disable warning that + * function's result should be checked by by caller. error is + * impossible here, as vmtruncate() control path is disabled. + */ + err = inode_setattr(inode, attr); + ll_update_inode(inode, &md); ptlrpc_req_finished(request); - if (!md.lsm || !S_ISREG(inode->i_mode)) { + if (!lsm || !S_ISREG(inode->i_mode)) { CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); RETURN(0); } } else { - /* The OST doesn't check permissions, but the alternative is - * a gratuitous RPC to the MDS. We already rely on the client - * to do read/write/truncate permission checks, so is mtime OK? + /* The OST doesn't check permissions, but the alternative is + * a gratuitous RPC to the MDS. We already rely on the client + * to do read/write/truncate permission checks, so is mtime OK? + */ + if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) { + /* from sys_utime() */ + if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { + if (current->fsuid != inode->i_uid && + (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0) + RETURN(rc); + } else { + /* from inode_change_ok() */ + if (current->fsuid != inode->i_uid && + !capable(CAP_FOWNER)) + RETURN(-EPERM); + } + } + + if (lsm) + attr->ia_valid &= ~ATTR_SIZE; + + /* won't invoke vmtruncate, as we already cleared ATTR_SIZE */ + err = inode_setattr(inode, attr); + /* + * assigning inode_setattr() to @err to disable warning that + * function's result should be checked by by caller. error is + * impossible here, as vmtruncate() control path is disabled. + */ + } + + /* We really need to get our PW lock before we change inode->i_size. + * If we don't we can race with other i_size updaters on our node, like + * ll_file_read. We can also race with i_size propogation to other + * nodes through dirtying and writeback of final cached pages. This + * last one is especially bad for racing o_append users on other + * nodes. */ + if (ia_valid & ATTR_SIZE) { + ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, + OBD_OBJECT_EOF } }; + struct lustre_handle lockh = { 0 }; + int err, ast_flags = 0; + /* XXX when we fix the AST intents to pass the discard-range + * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA + * XXX here. */ + if (attr->ia_size == 0) + ast_flags = LDLM_AST_DISCARD_DATA; + + rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, + ast_flags, &ll_i2sbi(inode)->ll_seek_stime); + + if (rc != 0) + RETURN(rc); + + down(&lli->lli_size_sem); + lli->lli_size_pid = current->pid; + rc = vmtruncate(inode, attr->ia_size); + if (rc != 0) { + LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + lli->lli_size_pid = 0; + up(&lli->lli_size_sem); + } + + err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); + if (err) { + CERROR("ll_extent_unlock failed: %d\n", err); + if (!rc) + rc = err; + } + } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET | ATTR_UID | ATTR_GID)) { + struct obdo *oa = NULL; + + CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", + inode->i_ino, LTIME_S(attr->ia_mtime)); + + oa = obdo_alloc(); + if (oa == NULL) + RETURN(-ENOMEM); + + oa->o_id = lsm->lsm_object_id; + oa->o_gr = lsm->lsm_object_gr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + /* adding uid and gid, needed for quota */ + if (ia_valid & ATTR_UID) { + oa->o_uid = inode->i_uid; + oa->o_valid |= OBD_MD_FLUID; + } + + if (ia_valid & ATTR_GID) { + oa->o_gid = inode->i_gid; + oa->o_valid |= OBD_MD_FLGID; + } + + *(obdo_id(oa)) = lli->lli_id; + oa->o_valid |= OBD_MD_FLIFID; + + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL, NULL); + obdo_free(oa); + if (rc) + CERROR("obd_setattr fails: rc = %d\n", rc); + } + + RETURN(rc); +} + +int ll_setattr(struct dentry *de, struct iattr *attr) +{ + LASSERT(de->d_inode); + return ll_setattr_raw(de->d_inode, attr); +} + +int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, + unsigned long max_age) +{ + struct ll_sb_info *sbi = ll_s2sbi(sb); + struct obd_statfs obd_osfs; + int rc; + ENTRY; + + rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age); + if (rc) { + CERROR("obd_statfs fails: rc = %d\n", rc); + RETURN(rc); + } + + osfs->os_type = sb->s_magic; + + CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); + + rc = obd_statfs(class_exp2obd(sbi->ll_dt_exp), &obd_osfs, max_age); + if (rc) { + CERROR("obd_statfs fails: rc = %d\n", rc); + RETURN(rc); + } + + CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", + obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree, + obd_osfs.os_files); + + osfs->os_blocks = obd_osfs.os_blocks; + osfs->os_bfree = obd_osfs.os_bfree; + osfs->os_bavail = obd_osfs.os_bavail; + + /* If we don't have as many objects free on the OST as inodes + * on the MDS, we reduce the total number of inodes to + * compensate, so that the "inodes in use" number is correct. + */ + if (obd_osfs.os_ffree < osfs->os_ffree) { + osfs->os_files = (osfs->os_files - osfs->os_ffree) + + obd_osfs.os_ffree; + osfs->os_ffree = obd_osfs.os_ffree; + } + + RETURN(rc); +} + +int ll_statfs(struct super_block *sb, struct kstatfs *sfs) +{ + struct obd_statfs osfs; + int rc; + + CDEBUG(D_VFSTRACE, "VFS Op: superblock %p\n", sb); + lprocfs_counter_incr(ll_s2sbi(sb)->ll_stats, LPROC_LL_STAFS); + + /* For now we will always get up-to-date statfs values, but in the + * future we may allow some amount of caching on the client (e.g. + * from QOS or lprocfs updates). */ + rc = ll_statfs_internal(sb, &osfs, jiffies - 1); + if (rc) + return rc; + + statfs_unpack(sfs, &osfs); + + if (sizeof(sfs->f_blocks) == 4) { + while (osfs.os_blocks > ~0UL) { + sfs->f_bsize <<= 1; + + osfs.os_blocks >>= 1; + osfs.os_bfree >>= 1; + osfs.os_bavail >>= 1; + } + } + + sfs->f_blocks = osfs.os_blocks; + sfs->f_bfree = osfs.os_bfree; + sfs->f_bavail = osfs.os_bavail; + + return 0; +} + + +/******************************** + * remote acl * + ********************************/ + +static struct remote_acl *remote_acl_alloc(void) +{ + struct remote_acl *racl; + int i; + + OBD_ALLOC(racl, sizeof(*racl)); + if (!racl) + return NULL; + + spin_lock_init(&racl->ra_lock); + init_MUTEX(&racl->ra_update_sem); + + for (i = 0; i < REMOTE_ACL_HASHSIZE; i++) + INIT_LIST_HEAD(&racl->ra_perm_cache[i]); + + return racl; +} + +/* + * caller should guarantee no race here. + */ +static void remote_perm_flush_xperms(struct lustre_remote_perm *perm) +{ + struct remote_perm_setxid *xperm; + + while (!list_empty(&perm->lrp_setxid_perms)) { + xperm = list_entry(perm->lrp_setxid_perms.next, + struct remote_perm_setxid, + list); + list_del(&xperm->list); + OBD_FREE(xperm, sizeof(*xperm)); + } +} + +/* + * caller should guarantee no race here. + */ +static void remote_acl_flush(struct remote_acl *racl) +{ + struct list_head *head; + struct lustre_remote_perm *perm, *tmp; + int i; + + for (i = 0; i < REMOTE_ACL_HASHSIZE; i++) { + head = &racl->ra_perm_cache[i]; + + list_for_each_entry_safe(perm, tmp, head, lrp_list) { + remote_perm_flush_xperms(perm); + list_del(&perm->lrp_list); + OBD_FREE(perm, sizeof(*perm)); + } + } +} + +static void remote_acl_free(struct remote_acl *racl) +{ + if (!racl) + return; + + down(&racl->ra_update_sem); + spin_lock(&racl->ra_lock); + remote_acl_flush(racl); + spin_unlock(&racl->ra_lock); + up(&racl->ra_update_sem); + + OBD_FREE(racl, sizeof(*racl)); +} + +static inline int remote_acl_hashfunc(__u32 id) +{ + return (id & (REMOTE_ACL_HASHSIZE - 1)); +} + +static +int __remote_acl_check(struct remote_acl *racl, unsigned int *perm) +{ + struct list_head *head; + struct lustre_remote_perm *lperm; + struct remote_perm_setxid *xperm; + int found = 0, rc = -ENOENT; + + LASSERT(racl); + head = &racl->ra_perm_cache[remote_acl_hashfunc(current->uid)]; + spin_lock(&racl->ra_lock); + + list_for_each_entry(lperm, head, lrp_list) { + if (lperm->lrp_auth_uid == current->uid) { + found = 1; + break; + } + } + + if (!found) + goto out; + + if (lperm->lrp_auth_uid == current->fsuid && + lperm->lrp_auth_gid == current->fsgid) { + if (lperm->lrp_valid) { + *perm = lperm->lrp_perm; + rc = 0; + } + goto out; + } else if ((!lperm->lrp_setuid && + lperm->lrp_auth_uid != current->fsuid) || + (!lperm->lrp_setgid && + lperm->lrp_auth_gid != current->fsgid)) { + *perm = 0; + rc = 0; + goto out; + } + + list_for_each_entry(xperm, &lperm->lrp_setxid_perms, list) { + if (xperm->uid == current->fsuid && + xperm->gid == current->fsgid) { + *perm = xperm->perm; + rc = 0; + goto out; + } + } + +out: + spin_unlock(&racl->ra_lock); + return rc; +} + +static +int __remote_acl_update(struct remote_acl *racl, + struct mds_remote_perm *mperm, + struct lustre_remote_perm *lperm, + struct remote_perm_setxid *xperm) +{ + struct list_head *head; + struct lustre_remote_perm *lp; + struct remote_perm_setxid *xp; + int found = 0, setuid = 0, setgid = 0; + + LASSERT(racl); + LASSERT(mperm); + LASSERT(lperm); + LASSERT(current->uid == mperm->mrp_auth_uid); + + if (current->fsuid != mperm->mrp_auth_uid) + setuid = 1; + if (current->fsgid != mperm->mrp_auth_gid) + setgid = 1; + + head = &racl->ra_perm_cache[remote_acl_hashfunc(current->uid)]; + spin_lock(&racl->ra_lock); + + list_for_each_entry(lp, head, lrp_list) { + if (lp->lrp_auth_uid == current->uid) { + found = 1; + break; + } + } + + if (found) { + OBD_FREE(lperm, sizeof(*lperm)); + + if (!lp->lrp_valid && !setuid && !setgid) { + lp->lrp_perm = mperm->mrp_perm; + lp->lrp_valid = 1; + } + + /* sanity check for changes of setxid rules */ + if ((lp->lrp_setuid != 0) != (mperm->mrp_allow_setuid != 0)) { + CWARN("setuid changes: %d => %d\n", + (lp->lrp_setuid != 0), + (mperm->mrp_allow_setuid != 0)); + lp->lrp_setuid = (mperm->mrp_allow_setuid != 0); + } + + if ((lp->lrp_setgid != 0) != (mperm->mrp_allow_setgid != 0)) { + CWARN("setgid changes: %d => %d\n", + (lp->lrp_setgid != 0), + (mperm->mrp_allow_setgid != 0)); + lp->lrp_setgid = (mperm->mrp_allow_setgid != 0); + } + + if (!lp->lrp_setuid && !lp->lrp_setgid && + !list_empty(&lp->lrp_setxid_perms)) { + remote_perm_flush_xperms(lp); + } + } else { + /* initialize lperm and linked into hashtable */ - if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) { - /* from sys_utime() */ - if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { - if (current->fsuid != inode->i_uid && - (rc = permission(inode, MAY_WRITE)) != 0) - RETURN(rc); - } else { - /* from inode_change_ok() */ - if (current->fsuid != inode->i_uid && - !capable(CAP_FOWNER)) - RETURN(-EPERM); - } + INIT_LIST_HEAD(&lperm->lrp_setxid_perms); + lperm->lrp_auth_uid = mperm->mrp_auth_uid; + lperm->lrp_auth_gid = mperm->mrp_auth_gid; + lperm->lrp_setuid = (mperm->mrp_allow_setuid != 0); + lperm->lrp_setgid = (mperm->mrp_allow_setgid != 0); + list_add(&lperm->lrp_list, head); + + if (!setuid && !setgid) { + /* in this case, i'm the authenticated user, + * and mrp_perm is for me. + */ + lperm->lrp_perm = mperm->mrp_perm; + lperm->lrp_valid = 1; + spin_unlock(&racl->ra_lock); + + if (xperm) + OBD_FREE(xperm, sizeof(*xperm)); + return 0; } - /* Won't invoke vmtruncate, as we already cleared ATTR_SIZE */ - inode_setattr(inode, attr); + lp = lperm; + /* fall through */ } - if (ia_valid & ATTR_SIZE) { - struct ldlm_extent extent = { .start = attr->ia_size, - .end = OBD_OBJECT_EOF }; - struct lustre_handle lockh = { 0 }; - int err; + LASSERT(lp->lrp_setuid || lp->lrp_setgid || + list_empty(&lp->lrp_setxid_perms)); - /* Writeback uses inode->i_size to determine how far out - * its cached pages go. ll_truncate gets a PW lock, canceling - * our lock, _after_ it has updated i_size. this can confuse - * - * We really need to get our PW lock before we change - * inode->i_size. If we don't we can race with other - * i_size updaters on our node, like ll_file_read. We - * can also race with i_size propogation to other - * nodes through dirtying and writeback of final cached - * pages. This last one is especially bad for racing - * o_append users on other nodes. */ - /* bug 1639: avoid write/truncate i_sem/DLM deadlock */ - LASSERT(atomic_read(&inode->i_sem.count) == 0); - up(&inode->i_sem); - rc = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, - &extent, &lockh); - down(&inode->i_sem); - if (rc != ELDLM_OK) { - if (rc > 0) - RETURN(-ENOLCK); - RETURN(rc); - } + /* if no xperm supplied, we are all done here */ + if (!xperm) { + spin_unlock(&racl->ra_lock); + return 0; + } - rc = vmtruncate(inode, attr->ia_size); - if (rc == 0) - set_bit(LLI_F_HAVE_SIZE_LOCK, - &ll_i2info(inode)->lli_flags); + /* whether we allow setuid/setgid */ + if ((!lp->lrp_setuid && setuid) || (!lp->lrp_setgid && setgid)) { + OBD_FREE(xperm, sizeof(*xperm)); + spin_unlock(&racl->ra_lock); + return 0; + } - /* unlock now as we don't mind others file lockers racing with - * the mds updates below? */ - err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); - if (err) { - CERROR("ll_extent_unlock failed: %d\n", err); - if (!rc) - rc = err; + /* traverse xperm list */ + list_for_each_entry(xp, &lp->lrp_setxid_perms, list) { + if (xp->uid == current->fsuid && + xp->gid == current->fsgid) { + if (xp->perm != mperm->mrp_perm) { + /* actually this should not happen */ + CWARN("perm changed: %o => %o\n", + xp->perm, mperm->mrp_perm); + xp->perm = mperm->mrp_perm; + } + OBD_FREE(xperm, sizeof(*xperm)); + spin_unlock(&racl->ra_lock); + return 0; } - } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { - struct obdo oa; - - CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - inode->i_ino, attr->ia_mtime); - oa.o_id = lsm->lsm_object_id; - oa.o_valid = OBD_MD_FLID; - obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); - if (rc) - CERROR("obd_setattr fails: rc=%d\n", rc); } - RETURN(rc); -} -int ll_setattr(struct dentry *de, struct iattr *attr) -{ - int rc = inode_change_ok(de->d_inode, attr); - CDEBUG(D_VFSTRACE, "VFS Op:name=%s\n", de->d_name.name); - if (rc) - return rc; + /* finally insert this xperm */ + xperm->uid = current->fsuid; + xperm->gid = current->fsgid; + xperm->perm = mperm->mrp_perm; + list_add(&xperm->list, &lp->lrp_setxid_perms); - lprocfs_counter_incr(ll_i2sbi(de->d_inode)->ll_stats, LPROC_LL_SETATTR); - return ll_inode_setattr(de->d_inode, attr, 1); + spin_unlock(&racl->ra_lock); + return 0; } -int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, - unsigned long max_age) +/* + * remote_acl semaphore must be held by caller + */ +static +int remote_acl_update_locked(struct remote_acl *racl, + struct mds_remote_perm *mperm) { - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_statfs obd_osfs; - int rc; - ENTRY; + struct lustre_remote_perm *lperm; + struct remote_perm_setxid *xperm; + int setuid = 0, setgid = 0; - rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn), osfs, max_age); - if (rc) { - CERROR("mdc_statfs fails: rc = %d\n", rc); - RETURN(rc); + might_sleep(); + + if (current->uid != mperm->mrp_auth_uid) { + CERROR("current uid %u while authenticated as %u\n", + current->uid, mperm->mrp_auth_uid); + return -EINVAL; } - CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", - osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); + if (current->fsuid != mperm->mrp_auth_uid) + setuid = 1; + if (current->fsgid == mperm->mrp_auth_gid) + setgid = 1; + + OBD_ALLOC(lperm, sizeof(*lperm)); + if (!lperm) + return -ENOMEM; + + if ((setuid || setgid) && + !(setuid && !mperm->mrp_allow_setuid) && + !(setgid && !mperm->mrp_allow_setgid)) { + OBD_ALLOC(xperm, sizeof(*xperm)); + if (!xperm) { + OBD_FREE(lperm, sizeof(*lperm)); + return -ENOMEM; + } + } else + xperm = NULL; - rc = obd_statfs(class_conn2obd(&sbi->ll_osc_conn), &obd_osfs, max_age); - if (rc) { - CERROR("obd_statfs fails: rc = %d\n", rc); - RETURN(rc); + return __remote_acl_update(racl, mperm, lperm, xperm); +} + +/* + * return -EACCES at any error cases + */ +int ll_remote_acl_permission(struct inode *inode, int mode) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct remote_acl *racl = ll_i2info(inode)->lli_remote_acl; + struct ptlrpc_request *req = NULL; + struct lustre_id id; + struct mds_remote_perm *mperm; + int rc = -EACCES, perm; + + if (!racl) + return -EACCES; + + if (__remote_acl_check(racl, &perm) == 0) { + return ((perm & mode) == mode ? 0 : -EACCES); } - CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", - obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree, - obd_osfs.os_files); + might_sleep(); - osfs->os_blocks = obd_osfs.os_blocks; - osfs->os_bfree = obd_osfs.os_bfree; - osfs->os_bavail = obd_osfs.os_bavail; + /* doing update + */ + down(&racl->ra_update_sem); - /* If we don't have as many objects free on the OST as inodes - * on the MDS, we reduce the total number of inodes to - * compensate, so that the "inodes in use" number is correct. + /* we might lose the race when obtain semaphore, + * so check again. */ - if (obd_osfs.os_ffree < osfs->os_ffree) { - osfs->os_files = (osfs->os_files - osfs->os_ffree) + - obd_osfs.os_ffree; - osfs->os_ffree = obd_osfs.os_ffree; + if (__remote_acl_check(racl, &perm) == 0) { + if ((perm & mode) == mode) + rc = 0; + goto out; } - RETURN(rc); + /* really fetch from mds + */ + ll_inode2id(&id, inode); + if (md_access_check(sbi->ll_md_exp, &id, &req)) + goto out; + + /* status non-zero indicate there's more apparent error + * detected by mds, e.g. didn't allow this user at all. + * we simply ignore and didn't cache it. + */ + if (req->rq_repmsg->status) + goto out; + + mperm = lustre_swab_repbuf(req, 1, sizeof(*mperm), + lustre_swab_remote_perm); + LASSERT(mperm); + LASSERT_REPSWABBED(req, 1); + + if ((mperm->mrp_perm & mode) == mode) + rc = 0; + + remote_acl_update_locked(racl, mperm); +out: + if (req) + ptlrpc_req_finished(req); + + up(&racl->ra_update_sem); + return rc; } -int ll_statfs(struct super_block *sb, struct kstatfs *sfs) +int ll_remote_acl_update(struct inode *inode, struct mds_remote_perm *perm) { - struct obd_statfs osfs; + struct remote_acl *racl = ll_i2info(inode)->lli_remote_acl; int rc; - CDEBUG(D_VFSTRACE, "VFS Op:\n"); - lprocfs_counter_incr(ll_s2sbi(sb)->ll_stats, LPROC_LL_STAFS); + LASSERT(perm); - /* For now we will always get up-to-date statfs values, but in the - * future we may allow some amount of caching on the client (e.g. - * from QOS or lprocfs updates). */ - rc = ll_statfs_internal(sb, &osfs, jiffies - 1); - if (rc) - return rc; + if (!racl) + return -EACCES; - statfs_unpack(sfs, &osfs); + down(&racl->ra_update_sem); + rc = remote_acl_update_locked(racl, perm); + up(&racl->ra_update_sem); - if (sizeof(sfs->f_blocks) == 4) { - while (osfs.os_blocks > ~0UL) { - sfs->f_bsize <<= 1; + return rc; +} - osfs.os_blocks >>= 1; - osfs.os_bfree >>= 1; - osfs.os_bavail >>= 1; - } - } +void ll_inode_invalidate_acl(struct inode *inode) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_inode_info *lli = ll_i2info(inode); - sfs->f_blocks = osfs.os_blocks; - sfs->f_bfree = osfs.os_bfree; - sfs->f_bavail = osfs.os_bavail; + if (sbi->ll_remote) { + struct remote_acl *racl = lli->lli_remote_acl; - return 0; -} + LASSERT(!lli->lli_posix_acl); + if (racl) { + down(&racl->ra_update_sem); + spin_lock(&racl->ra_lock); + remote_acl_flush(lli->lli_remote_acl); + spin_unlock(&racl->ra_lock); + up(&racl->ra_update_sem); + } + } else { + /* we can't invalide acl here: suppose we touch a new file + * under a dir, blocking ast on dir will lead to open failure + * on client, although succeed on mds. it's kind of weird, + * the real fix i think is improve client-vfs interaction. + * + * currently we just do nothing here. + */ + return; -void dump_lsm(int level, struct lov_stripe_md *lsm) -{ - CDEBUG(level, "objid "LPX64", maxbytes "LPX64", magic %#08x, " - "stripe_size %#08x, offset %u, stripe_count %u\n", - lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic, - lsm->lsm_stripe_size, lsm->lsm_stripe_offset, - lsm->lsm_stripe_count); + LASSERT(!lli->lli_remote_acl); + spin_lock(&lli->lli_lock); + posix_acl_release(lli->lli_posix_acl); + lli->lli_posix_acl = NULL; + spin_unlock(&lli->lli_lock); + } } -void ll_update_inode(struct inode *inode, struct mds_body *body, - struct lov_stripe_md *lsm) +void ll_update_inode(struct inode *inode, struct lustre_md *md) { struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = md->lsm; + struct mds_body *body = md->body; + struct mea *mea = md->mea; + struct posix_acl *posix_acl = md->posix_acl; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_key *mkey = md->key; + ENTRY; + + LASSERT((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); - LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); + if (md->lsm && md->lsm->lsm_magic != LOV_MAGIC) { + /* check for default striping info for dir. */ + LASSERT((mea != NULL) == ((body->valid & OBD_MD_FLDIREA) != 0)); + } + if (lsm != NULL) { + LASSERT(lsm->lsm_object_gr > 0); if (lli->lli_smd == NULL) { lli->lli_smd = lsm; lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { + int i; if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { CERROR("lsm mismatch for inode %ld\n", inode->i_ino); @@ -831,21 +1994,92 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, dump_lsm(D_ERROR, lsm); LBUG(); } + /* XXX FIXME -- We should decide on a safer (atomic) and + * more elegant way to update the lsm */ + for (i = 0; i < lsm->lsm_stripe_count; i++) { + lli->lli_smd->lsm_oinfo[i].loi_id = + lsm->lsm_oinfo[i].loi_id; + lli->lli_smd->lsm_oinfo[i].loi_gr = + lsm->lsm_oinfo[i].loi_gr; + lli->lli_smd->lsm_oinfo[i].loi_ost_idx = + lsm->lsm_oinfo[i].loi_ost_idx; + lli->lli_smd->lsm_oinfo[i].loi_ost_gen = + lsm->lsm_oinfo[i].loi_ost_gen; + } + } + /* bug 2844 - limit i_blksize for broken user-space apps */ + LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize); + inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE); + if (lli->lli_smd != lsm) + obd_free_memmd(ll_i2dtexp(inode), &lsm); + } + + if (mea != NULL) { + if (lli->lli_mea == NULL) { + lli->lli_mea = mea; + } else { + if (memcmp(lli->lli_mea, mea, body->eadatasize)) { + CERROR("mea mismatch for inode %lu\n", + inode->i_ino); + LBUG(); + } + } + if (lli->lli_mea != mea) + obd_free_memmd(ll_i2mdexp(inode), + (struct lov_stripe_md **) &mea); + } + + if (body->valid & OBD_MD_FID) + id_assign_fid(&lli->lli_id, &body->id1); + + if (body->valid & OBD_MD_FLID) + id_ino(&lli->lli_id) = id_ino(&body->id1); + + if (body->valid & OBD_MD_FLGENER) + id_gen(&lli->lli_id) = id_gen(&body->id1); + + /* local/remote ACL */ + if (sbi->ll_remote) { + LASSERT(md->posix_acl == NULL); + if (md->remote_perm) { + ll_remote_acl_update(inode, md->remote_perm); + OBD_FREE(md->remote_perm, sizeof(*md->remote_perm)); + md->remote_perm = NULL; + } + } else { + LASSERT(md->remote_perm == NULL); + spin_lock(&lli->lli_lock); + if (posix_acl != NULL) { + if (lli->lli_posix_acl != NULL) + posix_acl_release(lli->lli_posix_acl); + lli->lli_posix_acl = posix_acl; } + spin_unlock(&lli->lli_lock); } if (body->valid & OBD_MD_FLID) - inode->i_ino = body->ino; + inode->i_ino = id_ino(&body->id1); + if (body->valid & OBD_MD_FLGENER) + inode->i_generation = id_gen(&body->id1); if (body->valid & OBD_MD_FLATIME) LTIME_S(inode->i_atime) = body->atime; - if (body->valid & OBD_MD_FLMTIME) + if (body->valid & OBD_MD_FLMTIME && + body->mtime > LTIME_S(inode->i_mtime)) { + CDEBUG(D_INODE, "setting ino %lu mtime from %lu to %u\n", + inode->i_ino, LTIME_S(inode->i_mtime), body->mtime); LTIME_S(inode->i_mtime) = body->mtime; - if (body->valid & OBD_MD_FLCTIME) + } + if (body->valid & OBD_MD_FLCTIME && + body->ctime > LTIME_S(inode->i_ctime)) LTIME_S(inode->i_ctime) = body->ctime; - if (body->valid & OBD_MD_FLMODE) - inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT); - if (body->valid & OBD_MD_FLTYPE) - inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT); + if (body->valid & OBD_MD_FLMODE) { + inode->i_mode = (inode->i_mode & S_IFMT) | + (body->mode & ~S_IFMT); + } + if (body->valid & OBD_MD_FLTYPE) { + inode->i_mode = (inode->i_mode & ~S_IFMT) | + (body->mode & S_IFMT); + } if (body->valid & OBD_MD_FLUID) inode->i_uid = body->uid; if (body->valid & OBD_MD_FLGID) @@ -854,20 +2088,46 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, inode->i_flags = body->flags; if (body->valid & OBD_MD_FLNLINK) inode->i_nlink = body->nlink; - if (body->valid & OBD_MD_FLGENER) - inode->i_generation = body->generation; if (body->valid & OBD_MD_FLRDEV) #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) inode->i_rdev = body->rdev; #else - inode->i_rdev = to_kdev_t(body->rdev); + inode->i_rdev = old_decode_dev(body->rdev); #endif if (body->valid & OBD_MD_FLSIZE) inode->i_size = body->size; if (body->valid & OBD_MD_FLBLOCKS) inode->i_blocks = body->blocks; + + if (body->valid & OBD_MD_FLSIZE) + set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + + if (body->valid & OBD_MD_FLAUDIT) { + struct ll_sb_info * sbi = ll_s2sbi(inode->i_sb); + if (IS_AUDIT_OP(body->audit, AUDIT_FS)) + sbi->ll_audit_mask = body->audit; + else + lli->lli_audit_mask = body->audit; + } + + if (mkey != NULL) { + LASSERT(body->valid & OBD_MD_FLKEY); + ll_crypto_init_inode_key(inode, mkey); + } + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + inode->i_dev = (kdev_t)id_group(&lli->lli_id); +#endif + LASSERT(id_fid(&lli->lli_id) != 0); } +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) +static struct backing_dev_info ll_backing_dev_info = { + .ra_pages = 0, /* No readahead */ + .memory_backed = 0, /* Does contribute to dirty memory */ +}; +#endif + void ll_read_inode2(struct inode *inode, void *opaque) { struct lustre_md *md = opaque; @@ -881,8 +2141,21 @@ void ll_read_inode2(struct inode *inode, void *opaque) LASSERT(!lli->lli_smd); - /* core attributes from the MDS first */ - ll_update_inode(inode, md->body, md->lsm); + if (ll_i2sbi(inode)->ll_remote) { + lli->lli_remote_acl = remote_acl_alloc(); + /* if failed alloc, nobody will be able to access this inode */ + } + + /* Core attributes from the MDS first. This is a new inode, and + * the VFS doesn't zero times in the core inode so we have to do + * it ourselves. They will be overwritten by either MDS or OST + * attributes - we just need to make sure they aren't newer. */ + LTIME_S(inode->i_mtime) = 0; + LTIME_S(inode->i_atime) = 0; + LTIME_S(inode->i_ctime) = 0; + + inode->i_rdev = 0; + ll_update_inode(inode, md); /* OIDEBUG(inode); */ @@ -901,51 +2174,201 @@ void ll_read_inode2(struct inode *inode, void *opaque) EXIT; } else { inode->i_op = &ll_special_inode_operations; + #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - init_special_inode(inode, inode->i_mode, + init_special_inode(inode, inode->i_mode, kdev_t_to_nr(inode->i_rdev)); + + /* initializing backing dev info. */ + inode->i_mapping->backing_dev_info = &ll_backing_dev_info; #else init_special_inode(inode, inode->i_mode, inode->i_rdev); #endif + lli->ll_save_ifop = inode->i_fop; + + if (S_ISCHR(inode->i_mode)) + inode->i_fop = &ll_special_chr_inode_fops; + else if (S_ISBLK(inode->i_mode)) + inode->i_fop = &ll_special_blk_inode_fops; + else if (S_ISFIFO(inode->i_mode)) + inode->i_fop = &ll_special_fifo_inode_fops; + else if (S_ISSOCK(inode->i_mode)) + inode->i_fop = &ll_special_sock_inode_fops; + + CWARN("saved %p, replaced with %p\n", lli->ll_save_ifop, + inode->i_fop); + + if (lli->ll_save_ifop->owner) { + CWARN("%p has owner %p\n", lli->ll_save_ifop, + lli->ll_save_ifop->owner); + } EXIT; } } -int it_disposition(struct lookup_intent *it, int flag) +void ll_delete_inode(struct inode *inode) { - return it->it_disposition & flag; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_id id; + int rc; + ENTRY; + + ll_inode2id(&id, inode); + + rc = md_delete_inode(sbi->ll_md_exp, &id); + if (rc) { + CERROR("md_delete_inode() failed, error %d\n", + rc); + } + + clear_inode(inode); + EXIT; } -void it_set_disposition(struct lookup_intent *it, int flag) +int ll_iocontrol(struct inode *inode, struct file *file, + unsigned int cmd, unsigned long arg) { - it->it_disposition |= flag; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *req = NULL; + int rc, flags = 0; + ENTRY; + + switch(cmd) { + case EXT3_IOC_GETFLAGS: { + struct lustre_id id; + __u64 valid = OBD_MD_FLFLAGS; + struct mds_body *body; + + ll_inode2id(&id, inode); + rc = md_getattr(sbi->ll_md_exp, &id, valid, NULL, NULL, + 0, 0, NULL, &req); + if (rc) { + CERROR("failure %d inode %lu\n", rc, inode->i_ino); + RETURN(-abs(rc)); + } + + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); + + if (body->flags & S_APPEND) + flags |= EXT3_APPEND_FL; + if (body->flags & S_IMMUTABLE) + flags |= EXT3_IMMUTABLE_FL; + if (body->flags & S_NOATIME) + flags |= EXT3_NOATIME_FL; + + ptlrpc_req_finished (req); + + RETURN(put_user(flags, (int *)arg)); + } + case EXT3_IOC_SETFLAGS: { + struct mdc_op_data *op_data; + struct iattr attr; + struct obdo *oa; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + + if (get_user(flags, (int *)arg)) + RETURN(-EFAULT); + + oa = obdo_alloc(); + if (!oa) + RETURN(-ENOMEM); + + OBD_ALLOC(op_data, sizeof(*op_data)); + if (op_data == NULL) { + obdo_free(oa); + RETURN(-ENOMEM); + } + ll_inode2mdc_data(op_data, inode, (OBD_MD_FLID | OBD_MD_MEA)); + + memset(&attr, 0x0, sizeof(attr)); + attr.ia_attr_flags = flags; + attr.ia_valid |= ATTR_ATTR_FLAG; + + rc = md_setattr(sbi->ll_md_exp, op_data, + &attr, NULL, 0, NULL, 0, NULL, 0, &req); + OBD_FREE(op_data, sizeof(*op_data)); + if (rc) { + ptlrpc_req_finished(req); + if (rc != -EPERM && rc != -EACCES) + CERROR("md_setattr fails: rc = %d\n", rc); + obdo_free(oa); + RETURN(rc); + } + ptlrpc_req_finished(req); + + oa->o_id = lsm->lsm_object_id; + oa->o_gr = lsm->lsm_object_gr; + oa->o_flags = flags; + *(obdo_id(oa)) = ll_i2info(inode)->lli_id; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP + | OBD_MD_FLIFID; + + rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL, NULL); + obdo_free(oa); + if (rc) { + if (rc != -EPERM && rc != -EACCES) + CERROR("md_setattr fails: rc = %d\n", rc); + RETURN(rc); + } + + if (flags & EXT3_APPEND_FL) + inode->i_flags |= S_APPEND; + else + inode->i_flags &= ~S_APPEND; + if (flags & EXT3_IMMUTABLE_FL) + inode->i_flags |= S_IMMUTABLE; + else + inode->i_flags &= ~S_IMMUTABLE; + if (flags & EXT3_NOATIME_FL) + inode->i_flags |= S_NOATIME; + else + inode->i_flags &= ~S_NOATIME; + + RETURN(0); + } + default: + RETURN(-ENOSYS); + } + + RETURN(0); } +/* this is only called in the case of forced umount. */ void ll_umount_begin(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_device *obd; struct obd_ioctl_data ioc_data = { 0 }; + struct obd_device *obd; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:\n"); - - obd = class_conn2obd(&sbi->ll_mdc_conn); + + CDEBUG(D_VFSTRACE, "VFS Op: superblock %p count %d active %d\n", sb, + sb->s_count, atomic_read(&sb->s_active)); + + obd = class_exp2obd(sbi->ll_md_exp); if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", - sbi->ll_mdc_conn.cookie); + sbi->ll_md_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_mdc_conn, sizeof ioc_data, - &ioc_data, NULL); + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, + sizeof(ioc_data), &ioc_data, NULL); + + obd = class_exp2obd(sbi->ll_dt_exp); + if (obd == NULL) { + CERROR("Invalid LOV connection handle "LPX64"\n", + sbi->ll_dt_exp->exp_handle.h_cookie); + EXIT; + return; + } - obd = class_conn2obd(&sbi->ll_osc_conn); obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_osc_conn, sizeof ioc_data, - &ioc_data, NULL); + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, + sizeof(ioc_data), &ioc_data, NULL); - /* Really, we'd like to wait until there are no requests outstanding, + /* + * really, we'd like to wait until there are no requests outstanding, * and then continue. For now, we just invalidate the requests, * schedule, and hope. */ @@ -953,3 +2376,104 @@ void ll_umount_begin(struct super_block *sb) EXIT; } + +int ll_prep_inode(struct obd_export *dt_exp, struct obd_export *md_exp, + struct inode **inode, struct ptlrpc_request *req, + int offset, struct super_block *sb) +{ + struct lustre_md md; + int rc = 0; + + rc = mdc_req2lustre_md(md_exp, req, offset, dt_exp, &md); + if (rc) + RETURN(rc); + + if (*inode) { + ll_update_inode(*inode, &md); + } else { + LASSERT(sb); + *inode = ll_iget(sb, id_ino(&md.body->id1), &md); + if (*inode == NULL || is_bad_inode(*inode)) { + /* free the lsm if we allocated one above */ + if (md.lsm != NULL) + obd_free_memmd(dt_exp, &md.lsm); + if (md.mea != NULL) + obd_free_memmd(md_exp, + (struct lov_stripe_md**)&md.mea); + rc = -ENOMEM; + CERROR("new_inode -fatal: rc %d\n", rc); + } + } + + RETURN(rc); +} + +int ll_show_options(struct seq_file *m, struct vfsmount *mnt) +{ + struct ll_sb_info *sbi = ll_s2sbi(mnt->mnt_sb); + struct lustre_mount_data *lmd = sbi->ll_lmd; + + if (lmd) { + seq_printf(m, ",mds_sec=%s,oss_sec=%s", + lmd->lmd_mds_security, lmd->lmd_oss_security); + } + seq_printf(m, ",%s", sbi->ll_remote ? "remote" : "local"); + if (sbi->ll_remote && lmd) + seq_printf(m, ",nllu=%u:%u", lmd->lmd_nllu, lmd->lmd_nllg); + + if (lmd && lmd->lmd_pag) + seq_printf(m, ",pag"); + + return 0; +} + +int ll_get_fid(struct obd_export *exp, struct lustre_id *idp, + char *filename, struct lustre_id *ret) +{ + struct ptlrpc_request *request = NULL; + struct mds_body *body; + int rc; + + rc = md_getattr_lock(exp, idp, filename, strlen(filename) + 1, + OBD_MD_FID, 0, &request); + if (rc < 0) { + CDEBUG(D_INFO, "md_getattr_lock failed on %s: rc %d\n", + filename, rc); + return rc; + } + + body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); + LASSERT(body != NULL); + LASSERT_REPSWABBED(request, 0); + + *ret = body->id1; + ptlrpc_req_finished(request); + + return rc; +} +int ll_flush_cred(struct inode *inode) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + int rc = 0; + + /* XXX to avoid adding api, we simply use set_info() interface + * to notify underlying obds. set_info() is more like a ioctl() now... + */ + if (sbi->ll_md_exp) { + rc = obd_set_info(sbi->ll_md_exp, + strlen("flush_cred"), "flush_cred", + 0, NULL); + if (rc) + return rc; + } + + if (sbi->ll_dt_exp) { + rc = obd_set_info(sbi->ll_dt_exp, + strlen("flush_cred"), "flush_cred", + 0, NULL); + if (rc) + return rc; + } + + return rc; +}