X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fllite%2Fllite_lib.c;h=bef852f17271102cc51bd1b3cd24079a0ceb71f5;hb=fafe847f90b989452ce8812d599e59276728d25a;hp=bf8fb4c449e5ab3066fc852032357bc533e6e615;hpb=9f59ef764e80542d82f3b78902615124ddac1020;p=fs%2Flustre-release.git diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index bf8fb4c..bef852f 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -53,12 +53,25 @@ struct ll_sb_info *lustre_init_sbi(struct super_block *sb) if (!sbi) RETURN(NULL); - spin_lock_init(&sbi->ll_pglist_lock); + spin_lock_init(&sbi->ll_lock); INIT_LIST_HEAD(&sbi->ll_pglist); sbi->ll_pglist_gen = 0; + if (num_physpages < SBI_DEFAULT_RA_MAX / 4) + sbi->ll_ra_info.ra_max_pages = num_physpages / 4; + else + sbi->ll_ra_info.ra_max_pages = SBI_DEFAULT_RA_MAX; INIT_LIST_HEAD(&sbi->ll_conn_chain); INIT_HLIST_HEAD(&sbi->ll_orphan_dentry_list); - ll_s2sbi(sb) = sbi; + INIT_LIST_HEAD(&sbi->ll_mnt_list); + sema_init(&sbi->ll_gns_sem, 1); + init_completion(&sbi->ll_gns_completion); + sbi->ll_gns_state = LL_GNS_STATE_IDLE; + sbi->ll_gns_timer.data = (unsigned long)sbi; + sbi->ll_gns_timer.function = ll_gns_timer_callback; + init_timer(&sbi->ll_gns_timer); + INIT_LIST_HEAD(&sbi->ll_gns_sbi_head); + + ll_set_sbi(sb, sbi); generate_random_uuid(uuid); class_uuid_unparse(uuid, &sbi->ll_sb_uuid); @@ -70,118 +83,143 @@ void lustre_free_sbi(struct super_block *sb) struct ll_sb_info *sbi = ll_s2sbi(sb); ENTRY; - if (sbi != NULL) + if (sbi != NULL) { + list_del(&sbi->ll_gns_sbi_head); + del_timer(&sbi->ll_gns_timer); OBD_FREE(sbi, sizeof(*sbi)); - ll_s2sbi(sb) = NULL; + } + ll_set_sbi(sb, NULL); EXIT; } -int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) +int lustre_init_dt_desc(struct ll_sb_info *sbi) +{ + int valsize, rc; + ENTRY; + + valsize = sizeof(sbi->ll_dt_desc); + memset(&sbi->ll_dt_desc, 0, sizeof(sbi->ll_dt_desc)); + rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, + "lovdesc", &valsize, &sbi->ll_dt_desc); + RETURN(rc); +} + +int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov) { - struct inode *root = 0; struct ll_sb_info *sbi = ll_s2sbi(sb); + struct ptlrpc_request *request = NULL; + struct lustre_handle dt_conn = {0, }; + struct lustre_handle md_conn = {0, }; + struct inode *root = NULL; struct obd_device *obd; - struct ll_fid rootfid; struct obd_statfs osfs; - struct ptlrpc_request *request = NULL; - struct lustre_handle osc_conn = {0, }; - struct lustre_handle mdc_conn = {0, }; struct lustre_md md; kdev_t devno; int err; + ENTRY; - obd = class_name2obd(mdc); + obd = class_name2obd(lmv); if (!obd) { - CERROR("MDC %s: not setup or attached\n", mdc); + CERROR("MDC %s: not setup or attached\n", lmv); RETURN(-EINVAL); } if (proc_lustre_fs_root) { err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb, - osc, mdc); + lov, lmv); if (err < 0) CERROR("could not register mount in /proc/lustre"); } - mdc_init_ea_size(obd, osc); - - err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&md_conn, obd, &sbi->ll_sb_uuid, OBD_OPT_REAL_CLIENT); if (err == -EBUSY) { CERROR("An MDS (mdc %s) is performing recovery, of which this" " client is not a part. Please wait for recovery to " - "complete, abort, or time out.\n", mdc); + "complete, abort, or time out.\n", lmv); GOTO(out, err); } else if (err) { - CERROR("cannot connect to %s: rc = %d\n", mdc, err); + CERROR("cannot connect to %s: rc = %d\n", lmv, err); GOTO(out, err); } - sbi->ll_mdc_exp = class_conn2export(&mdc_conn); - + sbi->ll_md_exp = class_conn2export(&md_conn); err = obd_statfs(obd, &osfs, jiffies - HZ); if (err) - GOTO(out_mdc, err); + GOTO(out_lmv, err); + + if (!osfs.os_bsize) { + CERROR("Invalid block size is detected."); + GOTO(out_lmv, err); + } - LASSERT(osfs.os_bsize); + sb->s_magic = LL_SUPER_MAGIC; sb->s_blocksize = osfs.os_bsize; sb->s_blocksize_bits = log2(osfs.os_bsize); - sb->s_magic = LL_SUPER_MAGIC; sb->s_maxbytes = PAGE_CACHE_MAXBYTES; + + devno = get_uuid2int(sbi->ll_md_exp->exp_obd->obd_uuid.uuid, + strlen(sbi->ll_md_exp->exp_obd->obd_uuid.uuid)); - devno = get_uuid2int(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid, - strlen(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid)); sb->s_dev = devno; - obd = class_name2obd(osc); + obd = class_name2obd(lov); if (!obd) { - CERROR("OSC %s: not setup or attached\n", osc); - GOTO(out_mdc, err); + CERROR("OSC %s: not setup or attached\n", lov); + GOTO(out_lmv, err); } - err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&dt_conn, obd, &sbi->ll_sb_uuid, OBD_OPT_REAL_CLIENT); if (err == -EBUSY) { CERROR("An OST (osc %s) is performing recovery, of which this" " client is not a part. Please wait for recovery to " - "complete, abort, or time out.\n", osc); + "complete, abort, or time out.\n", lov); GOTO(out, err); } else if (err) { - CERROR("cannot connect to %s: rc = %d\n", osc, err); - GOTO(out_mdc, err); + CERROR("cannot connect to %s: rc = %d\n", lov, err); + GOTO(out_lmv, err); } - sbi->ll_osc_exp = class_conn2export(&osc_conn); - - err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); + sbi->ll_dt_exp = class_conn2export(&dt_conn); + + err = lustre_init_dt_desc(sbi); + if (err == 0) { + int mdsize = obd_size_diskmd(sbi->ll_dt_exp, NULL); + obd_init_ea_size(sbi->ll_md_exp, mdsize, + sbi->ll_dt_desc.ld_tgt_count * + sizeof(struct llog_cookie)); + } + + err = md_getstatus(sbi->ll_md_exp, &sbi->ll_rootid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_osc, err); + GOTO(out_lov, err); } - CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id); - sbi->ll_rootino = rootfid.id; + CDEBUG(D_SUPER, "rootid "DLID4"\n", OLID4(&sbi->ll_rootid)); sb->s_op = &lustre_super_operations; /* make root inode * XXX: move this to after cbd setup? */ - err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, - OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); + err = md_getattr(sbi->ll_md_exp, &sbi->ll_rootid, + OBD_MD_FLNOTOBD | OBD_MD_FLBLOCKS, + 0, &request); if (err) { - CERROR("mdc_getattr failed for root: rc = %d\n", err); - GOTO(out_osc, err); + CERROR("md_getattr failed for root: rc = %d\n", err); + GOTO(out_lov, err); } - err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); + err = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, + sbi->ll_dt_exp, &md); if (err) { - CERROR("failed to understand root inode md: rc = %d\n",err); - ptlrpc_req_finished (request); - GOTO(out_osc, err); + CERROR("failed to understand root inode md: rc = %d\n", err); + ptlrpc_req_finished(request); + GOTO(out_lov, err); } - LASSERT(sbi->ll_rootino != 0); - root = ll_iget(sb, sbi->ll_rootino, &md); + LASSERT(id_ino(&sbi->ll_rootid) != 0); + root = ll_iget(sb, id_ino(&sbi->ll_rootid), &md); ptlrpc_req_finished(request); if (root == NULL || is_bad_inode(root)) { - /* XXX might need iput() for bad inode */ CERROR("lustre_lite: bad iget4 for root\n"); GOTO(out_root, err = -EBADF); } @@ -192,27 +230,34 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) GOTO(out_root, err); } - /* making vm readahead 0 for 2.4.x. In the case of 2.6.x, - backing dev info assigned to inode mapping is used for - determining maximal readahead. */ + ll_gns_add_timer(sbi); + + /* making vm readahead 0 for 2.4.x. In the case of 2.6.x, backing dev + info assigned to inode mapping is used for determining maximal + readahead. */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)) /* bug 2805 - set VM readahead to zero */ vm_max_readahead = vm_min_readahead = 0; #endif sb->s_root = d_alloc_root(root); - RETURN(err); +#ifdef S_PDIROPS + CWARN("Enabling PDIROPS\n"); + sb->s_flags |= S_PDIROPS; +#endif + + RETURN(err); out_root: if (root) iput(root); -out_osc: - obd_disconnect(sbi->ll_osc_exp, 0); -out_mdc: - obd_disconnect(sbi->ll_mdc_exp, 0); +out_lov: + obd_disconnect(sbi->ll_dt_exp, 0); +out_lmv: + obd_disconnect(sbi->ll_md_exp, 0); out: lprocfs_unregister_mountpoint(sbi); - RETURN(err); + return err; } void lustre_common_put_super(struct super_block *sb) @@ -221,10 +266,11 @@ void lustre_common_put_super(struct super_block *sb) struct hlist_node *tmp, *next; ENTRY; + ll_gns_del_timer(sbi); ll_close_thread_shutdown(sbi->ll_lcq); list_del(&sbi->ll_conn_chain); - obd_disconnect(sbi->ll_osc_exp, 0); + obd_disconnect(sbi->ll_dt_exp, 0); lprocfs_unregister_mountpoint(sbi); if (sbi->ll_proc_root) { @@ -232,19 +278,20 @@ void lustre_common_put_super(struct super_block *sb) sbi->ll_proc_root = NULL; } - obd_disconnect(sbi->ll_mdc_exp, 0); + obd_disconnect(sbi->ll_md_exp, 0); // We do this to get rid of orphaned dentries. That is not really trw. spin_lock(&dcache_lock); hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash); + CWARN("orphan dentry %*s (%p) at unmount\n", + dentry->d_name.len, dentry->d_name.name, dentry); shrink_dcache_parent(dentry); } spin_unlock(&dcache_lock); EXIT; } - char *ll_read_opt(const char *opt, char *data) { char *value; @@ -280,7 +327,7 @@ int ll_set_opt(const char *opt, char *data, int fl) RETURN(fl); } -void ll_options(char *options, char **ost, char **mdc, int *flags) +void ll_options(char *options, char **lov, char **lmv, int *flags) { char *this_char; #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) @@ -301,9 +348,9 @@ void ll_options(char *options, char **ost, char **mdc, int *flags) while ((this_char = strsep (&opt_ptr, ",")) != NULL) { #endif CDEBUG(D_SUPER, "this_char %s\n", this_char); - if (!*ost && (*ost = ll_read_opt("osc", this_char))) + if (!*lov && (*lov = ll_read_opt("osc", this_char))) continue; - if (!*mdc && (*mdc = ll_read_opt("mdc", this_char))) + if (!*lmv && (*lmv = ll_read_opt("mdc", this_char))) continue; if (!(*flags & LL_SBI_NOLCK) && ((*flags) = (*flags) | @@ -311,6 +358,7 @@ void ll_options(char *options, char **ost, char **mdc, int *flags) LL_SBI_NOLCK))) continue; } + EXIT; } @@ -321,13 +369,15 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; spin_lock_init(&lli->lli_lock); INIT_LIST_HEAD(&lli->lli_pending_write_llaps); + lli->lli_inode_magic = LLI_INODE_MAGIC; + memset(&lli->lli_id, 0, sizeof(lli->lli_id)); } int ll_fill_super(struct super_block *sb, void *data, int silent) { struct ll_sb_info *sbi; - char *osc = NULL; - char *mdc = NULL; + char *lov = NULL; + char *lmv = NULL; int err; ENTRY; @@ -338,43 +388,43 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) RETURN(-ENOMEM); sbi->ll_flags |= LL_SBI_READAHEAD; - ll_options(data, &osc, &mdc, &sbi->ll_flags); + ll_options(data, &lov, &lmv, &sbi->ll_flags); - if (!osc) { + if (!lov) { CERROR("no osc\n"); GOTO(out, err = -EINVAL); } - if (!mdc) { + if (!lmv) { CERROR("no mdc\n"); GOTO(out, err = -EINVAL); } - err = lustre_common_fill_super(sb, mdc, osc); + err = lustre_common_fill_super(sb, lmv, lov); + EXIT; out: if (err) lustre_free_sbi(sb); - if (mdc) - OBD_FREE(mdc, strlen(mdc) + 1); - if (osc) - OBD_FREE(osc, strlen(osc) + 1); - - RETURN(err); + if (lmv) + OBD_FREE(lmv, strlen(lmv) + 1); + if (lov) + OBD_FREE(lov, strlen(lov) + 1); + return err; } /* ll_read_super */ -int lustre_process_log(struct lustre_mount_data *lmd, char * profile, - struct config_llog_instance *cfg, int allow_recov) +static int lustre_process_log(struct lustre_mount_data *lmd, char *profile, + struct config_llog_instance *cfg, int allow_recov) { struct lustre_cfg lcfg; struct portals_cfg pcfg; - char * peer = "MDS_PEER_UUID"; + char *peer = "MDS_PEER_UUID"; struct obd_device *obd; - struct lustre_handle mdc_conn = {0, }; + struct lustre_handle md_conn = {0, }; struct obd_export *exp; - char * name = "mdc_dev"; + char *name = "mdc_dev"; class_uuid_t uuid; - struct obd_uuid mdc_uuid; + struct obd_uuid lmv_uuid; struct llog_ctxt *ctxt; int rc = 0; int err; @@ -384,7 +434,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, RETURN(-EINVAL); generate_random_uuid(uuid); - class_uuid_unparse(uuid, &mdc_uuid); + class_uuid_unparse(uuid, &lmv_uuid); if (lmd->lmd_local_nid) { PCFG_INIT(pcfg, NAL_CMD_REGISTER_MYNID); @@ -395,8 +445,9 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, GOTO(out, err); } - if (lmd->lmd_nal == SOCKNAL) { - PCFG_INIT(pcfg, NAL_CMD_ADD_AUTOCONN); + if (lmd->lmd_nal == SOCKNAL || + lmd->lmd_nal == OPENIBNAL) { + PCFG_INIT(pcfg, NAL_CMD_ADD_PEER); pcfg.pcfg_nal = lmd->lmd_nal; pcfg.pcfg_nid = lmd->lmd_server_nid; pcfg.pcfg_id = lmd->lmd_server_ipaddr; @@ -420,7 +471,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, LCFG_INIT(lcfg, LCFG_ATTACH, name); lcfg.lcfg_inlbuf1 = "mdc"; lcfg.lcfg_inllen1 = strlen(lcfg.lcfg_inlbuf1) + 1; - lcfg.lcfg_inlbuf2 = mdc_uuid.uuid; + lcfg.lcfg_inlbuf2 = lmv_uuid.uuid; lcfg.lcfg_inllen2 = strlen(lcfg.lcfg_inlbuf2) + 1; err = class_process_config(&lcfg); if (err < 0) @@ -446,29 +497,22 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, if (err) GOTO(out_cleanup, err); - err = obd_connect(&mdc_conn, obd, &mdc_uuid); + err = obd_connect(&md_conn, obd, &lmv_uuid, 0); if (err) { CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, err); GOTO(out_cleanup, err); } - exp = class_conn2export(&mdc_conn); + exp = class_conn2export(&md_conn); - ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); -#if 1 - rc = class_config_parse_llog(ctxt, profile, cfg); -#else - /* - * For debugging, it's useful to just dump the log - */ - rc = class_config_dump_llog(ctxt, profile, cfg); -#endif - if (rc) { - CERROR("class_config_parse_llog failed: rc = %d\n", rc); - } + ctxt = llog_get_context(&exp->exp_obd->obd_llogs,LLOG_CONFIG_REPL_CTXT); + rc = class_config_process_llog(ctxt, profile, cfg); + if (rc) + CERROR("class_config_process_llog failed: rc = %d\n", rc); err = obd_disconnect(exp, 0); - + + EXIT; out_cleanup: LCFG_INIT(lcfg, LCFG_CLEANUP, name); err = class_process_config(&lcfg); @@ -488,12 +532,12 @@ out_del_uuid: err = class_process_config(&lcfg); out_del_conn: - if (lmd->lmd_nal == SOCKNAL) { - PCFG_INIT(pcfg, NAL_CMD_DEL_AUTOCONN); + if (lmd->lmd_nal == SOCKNAL || + lmd->lmd_nal == OPENIBNAL) { + PCFG_INIT(pcfg, NAL_CMD_DEL_PEER); pcfg.pcfg_nal = lmd->lmd_nal; pcfg.pcfg_nid = lmd->lmd_server_nid; - pcfg.pcfg_id = lmd->lmd_server_ipaddr; - pcfg.pcfg_flags = 1; /*share*/ + pcfg.pcfg_flags = 1; /* single_share */ err = libcfs_nal_cmd(&pcfg); if (err <0) GOTO(out, err); @@ -502,15 +546,14 @@ out: if (rc == 0) rc = err; - RETURN(rc); + return rc; } int lustre_fill_super(struct super_block *sb, void *data, int silent) { struct lustre_mount_data * lmd = data; + char *lov = NULL, *lmv = NULL; struct ll_sb_info *sbi; - char *osc = NULL; - char *mdc = NULL; int err; ENTRY; @@ -553,7 +596,6 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) err = lustre_process_log(lmd, lmd->lmd_profile, &cfg, 0); if (err < 0) { CERROR("Unable to process log: %s\n", lmd->lmd_profile); - GOTO(out_free, err); } @@ -562,39 +604,39 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) CERROR("No profile found: %s\n", lmd->lmd_profile); GOTO(out_free, err = -EINVAL); } - if (osc) - OBD_FREE(osc, strlen(osc) + 1); - OBD_ALLOC(osc, strlen(lprof->lp_osc) + + if (lov) + OBD_FREE(lov, strlen(lov) + 1); + OBD_ALLOC(lov, strlen(lprof->lp_lov) + strlen(sbi->ll_instance) + 2); - sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance); + sprintf(lov, "%s-%s", lprof->lp_lov, sbi->ll_instance); - if (mdc) - OBD_FREE(mdc, strlen(mdc) + 1); - OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + + if (lmv) + OBD_FREE(lmv, strlen(lmv) + 1); + OBD_ALLOC(lmv, strlen(lprof->lp_lmv) + strlen(sbi->ll_instance) + 2); - sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance); + sprintf(lmv, "%s-%s", lprof->lp_lmv, sbi->ll_instance); } - if (!osc) { + if (!lov) { CERROR("no osc\n"); GOTO(out_free, err = -EINVAL); } - if (!mdc) { + if (!lmv) { CERROR("no mdc\n"); GOTO(out_free, err = -EINVAL); } - err = lustre_common_fill_super(sb, mdc, osc); + err = lustre_common_fill_super(sb, lmv, lov); if (err) GOTO(out_free, err); - + out_dev: - if (mdc) - OBD_FREE(mdc, strlen(mdc) + 1); - if (osc) - OBD_FREE(osc, strlen(osc) + 1); + if (lmv) + OBD_FREE(lmv, strlen(lmv) + 1); + if (lov) + OBD_FREE(lov, strlen(lov) + 1); RETURN(err); @@ -604,6 +646,7 @@ out_free: int err; if (sbi->ll_instance != NULL) { + struct lustre_mount_data *lmd = sbi->ll_lmd; char * cln_prof; struct config_llog_instance cfg; @@ -611,10 +654,9 @@ out_free: cfg.cfg_uuid = sbi->ll_sb_uuid; OBD_ALLOC(cln_prof, len); - sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile); + sprintf(cln_prof, "%s-clean", lmd->lmd_profile); - err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, - 0); + err = lustre_process_log(lmd, cln_prof, &cfg, 0); if (err < 0) CERROR("Unable to process log: %s\n", cln_prof); OBD_FREE(cln_prof, len); @@ -664,13 +706,12 @@ void lustre_put_super(struct super_block *sb) ENTRY; CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); - obd = class_exp2obd(sbi->ll_mdc_exp); + obd = class_exp2obd(sbi->ll_md_exp); if (obd) force_umount = obd->obd_no_recov; obd = NULL; lustre_common_put_super(sb); - if (sbi->ll_lmd != NULL) { char * cln_prof; int len = strlen(sbi->ll_lmd->lmd_profile) + sizeof("-clean")+1; @@ -707,33 +748,108 @@ void lustre_put_super(struct super_block *sb) EXIT; } /* lustre_put_super */ +int ll_process_config_update(struct ll_sb_info *sbi, int clean) +{ + struct obd_export *md_exp = sbi->ll_md_exp; + struct lustre_mount_data *lmd = sbi->ll_lmd; + struct llog_ctxt *ctxt; + struct config_llog_instance cfg; + char *profile = lmd->lmd_profile, *name = NULL; + int rc, namelen = 0, version; + ENTRY; + + if (profile == NULL) + RETURN(0); + if (lmd == NULL) { + CERROR("Client not mounted with zero-conf; cannot process " + "update log.\n"); + RETURN(0); + } + + rc = ldlm_cli_cancel_unused(md_exp->exp_obd->obd_namespace, NULL, + LDLM_FL_CONFIG_CHANGE, NULL); + if (rc != 0) + CWARN("ldlm_cli_cancel_unused(mdc): %d\n", rc); + + rc = obd_cancel_unused(sbi->ll_dt_exp, NULL, LDLM_FL_CONFIG_CHANGE, + NULL); + if (rc != 0) + CWARN("obd_cancel_unused(lov): %d\n", rc); + + cfg.cfg_instance = sbi->ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + cfg.cfg_local_nid = lmd->lmd_local_nid; + + namelen = strlen(profile) + 20; /* -clean-######### */ + OBD_ALLOC(name, namelen); + if (name == NULL) + RETURN(-ENOMEM); + + if (clean) { + version = sbi->ll_config_version - 1; + sprintf(name, "%s-clean-%d", profile, version); + } else { + version = sbi->ll_config_version + 1; + sprintf(name, "%s-%d", profile, version); + } + + CWARN("Applying configuration log %s\n", name); + + ctxt = llog_get_context(&md_exp->exp_obd->obd_llogs, + LLOG_CONFIG_REPL_CTXT); + rc = class_config_process_llog(ctxt, name, &cfg); + if (rc == 0) + sbi->ll_config_version = version; + CWARN("Finished applying configuration log %s: %d\n", name, rc); + + if (rc == 0 && clean == 0) { + struct lov_desc desc; + int rc, valsize; + valsize = sizeof(desc); + rc = obd_get_info(sbi->ll_dt_exp, strlen("lovdesc") + 1, + "lovdesc", &valsize, &desc); + + rc = obd_init_ea_size(md_exp, + obd_size_diskmd(sbi->ll_dt_exp, NULL), + (desc.ld_tgt_count * + sizeof(struct llog_cookie))); + } + OBD_FREE(name, namelen); + RETURN(rc); +} struct inode *ll_inode_from_lock(struct ldlm_lock *lock) { - struct inode *inode; + struct inode *inode = NULL; l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (lock->l_ast_data) - inode = igrab(lock->l_ast_data); - else - inode = NULL; + if (lock->l_ast_data) { + struct ll_inode_info *lli = ll_i2info(lock->l_ast_data); + if (lli->lli_inode_magic == LLI_INODE_MAGIC) { + inode = igrab(lock->l_ast_data); + } else { + CERROR("DEBUG: l_ast_data %p is bogus: magic %x\n", + lock->l_ast_data, lli->lli_inode_magic); + } + } l_unlock(&lock->l_resource->lr_namespace->ns_lock); return inode; } -static int null_if_equal(struct ldlm_lock *lock, void *data) +int null_if_equal(struct ldlm_lock *lock, void *data) { - if (data == lock->l_ast_data) + if (data == lock->l_ast_data) { lock->l_ast_data = NULL; - if (lock->l_req_mode != lock->l_granted_mode) - return LDLM_ITER_STOP; + if (lock->l_req_mode != lock->l_granted_mode) + LDLM_ERROR(lock,"clearing inode with ungranted lock\n"); + } return LDLM_ITER_CONTINUE; } void ll_clear_inode(struct inode *inode) { - struct ll_fid fid; + struct lustre_id id; struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); ENTRY; @@ -741,19 +857,27 @@ void ll_clear_inode(struct inode *inode) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - ll_inode2fid(&fid, inode); + lli->lli_inode_magic = LLI_INODE_DEAD; + ll_inode2id(&id, inode); + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); - mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); + md_change_cbdata(sbi->ll_md_exp, &id, null_if_equal, inode); if (lli->lli_smd) - obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd, + obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd, null_if_equal, inode); if (lli->lli_smd) { - obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd); + obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd); lli->lli_smd = NULL; } + if (lli->lli_mea) { + obd_free_memmd(sbi->ll_md_exp, + (struct lov_stripe_md **) &lli->lli_mea); + lli->lli_mea = NULL; + } + if (lli->lli_symlink_name) { OBD_FREE(lli->lli_symlink_name, strlen(lli->lli_symlink_name) + 1); @@ -781,7 +905,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct mdc_op_data *op_data; int ia_valid = attr->ia_valid; int rc = 0; ENTRY; @@ -831,19 +955,24 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * inode ourselves so we can call obdo_from_inode() always. */ if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { struct lustre_md md; - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, - attr, NULL, 0, NULL, 0, &request); + OBD_ALLOC(op_data, sizeof(*op_data)); + if (op_data == NULL) + RETURN(-ENOMEM); + ll_prepare_mdc_data(op_data, inode, NULL, NULL, 0, 0); + rc = md_setattr(sbi->ll_md_exp, op_data, + attr, NULL, 0, NULL, 0, &request); + OBD_FREE(op_data, sizeof(*op_data)); if (rc) { ptlrpc_req_finished(request); if (rc != -EPERM && rc != -EACCES) - CERROR("mdc_setattr fails: rc = %d\n", rc); + CERROR("md_setattr fails: rc = %d\n", rc); RETURN(rc); } - rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); + rc = mdc_req2lustre_md(sbi->ll_md_exp, request, 0, + sbi->ll_dt_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); @@ -852,7 +981,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* Won't invoke vmtruncate as we already cleared ATTR_SIZE, * but needed to set timestamps backwards on utime. */ inode_setattr(inode, attr); - ll_update_inode(inode, md.body, md.lsm); + ll_update_inode(inode, &md); ptlrpc_req_finished(request); if (!lsm || !S_ISREG(inode->i_mode)) { @@ -903,9 +1032,9 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) LASSERT(atomic_read(&inode->i_sem.count) <= 0); up(&inode->i_sem); rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, - ast_flags); + ast_flags, &ll_i2sbi(inode)->ll_seek_stime); down(&inode->i_sem); - if (rc != ELDLM_OK) + if (rc != 0) RETURN(rc); rc = vmtruncate(inode, attr->ia_size); @@ -923,17 +1052,24 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) rc = err; } } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { - struct obdo oa; + struct obdo *oa = NULL; CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", inode->i_ino, LTIME_S(attr->ia_mtime)); - oa.o_id = lsm->lsm_object_id; - oa.o_valid = OBD_MD_FLID; - obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL); + + oa = obdo_alloc(); + if (oa == NULL) + RETURN(-ENOMEM); + + oa->o_id = lsm->lsm_object_id; + oa->o_gr = lsm->lsm_object_gr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL); + obdo_free(oa); if (rc) - CERROR("obd_setattr fails: rc=%d\n", rc); + CERROR("obd_setattr fails: rc = %d\n", rc); } RETURN(rc); } @@ -952,7 +1088,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, int rc; ENTRY; - rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age); if (rc) { CERROR("mdc_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -963,7 +1099,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_dt_exp), &obd_osfs, max_age); if (rc) { CERROR("obd_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -995,7 +1131,7 @@ int ll_statfs(struct super_block *sb, struct kstatfs *sfs) struct obd_statfs osfs; int rc; - CDEBUG(D_VFSTRACE, "VFS Op:\n"); + CDEBUG(D_VFSTRACE, "VFS Op: superblock %p\n", sb); lprocfs_counter_incr(ll_s2sbi(sb)->ll_stats, LPROC_LL_STAFS); /* For now we will always get up-to-date statfs values, but in the @@ -1024,27 +1160,25 @@ int ll_statfs(struct super_block *sb, struct kstatfs *sfs) return 0; } -void dump_lsm(int level, struct lov_stripe_md *lsm) -{ - CDEBUG(level, "objid "LPX64", maxbytes "LPX64", magic 0x%08X, " - "stripe_size %u, stripe_count %u\n", - lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic, - lsm->lsm_stripe_size, lsm->lsm_stripe_count); -} - -void ll_update_inode(struct inode *inode, struct mds_body *body, - struct lov_stripe_md *lsm) +void ll_update_inode(struct inode *inode, struct lustre_md *md) { struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = md->lsm; + struct mds_body *body = md->body; + struct mea *mea = md->mea; + ENTRY; - LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); + LASSERT((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); + LASSERT((mea != NULL) == ((body->valid & OBD_MD_FLDIREA) != 0)); if (lsm != NULL) { + LASSERT(lsm->lsm_object_gr > 0); if (lli->lli_smd == NULL) { lli->lli_smd = lsm; lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { + int i; if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { CERROR("lsm mismatch for inode %ld\n", inode->i_ino); @@ -1054,16 +1188,51 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, dump_lsm(D_ERROR, lsm); LBUG(); } + /* XXX FIXME -- We should decide on a safer (atomic) and + * more elegant way to update the lsm */ + for (i = 0; i < lsm->lsm_stripe_count; i++) { + lli->lli_smd->lsm_oinfo[i].loi_id = + lsm->lsm_oinfo[i].loi_id; + lli->lli_smd->lsm_oinfo[i].loi_gr = + lsm->lsm_oinfo[i].loi_gr; + lli->lli_smd->lsm_oinfo[i].loi_ost_idx = + lsm->lsm_oinfo[i].loi_ost_idx; + lli->lli_smd->lsm_oinfo[i].loi_ost_gen = + lsm->lsm_oinfo[i].loi_ost_gen; + } } /* bug 2844 - limit i_blksize for broken user-space apps */ LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize); inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE); if (lli->lli_smd != lsm) - obd_free_memmd(ll_i2obdexp(inode), &lsm); + obd_free_memmd(ll_i2dtexp(inode), &lsm); + } + + if (mea != NULL) { + if (lli->lli_mea == NULL) { + lli->lli_mea = mea; + } else { + if (memcmp(lli->lli_mea, mea, body->eadatasize)) { + CERROR("mea mismatch for inode %lu\n", + inode->i_ino); + LBUG(); + } + } + if (lli->lli_mea != mea) + obd_free_memmd(ll_i2mdexp(inode), + (struct lov_stripe_md **) &mea); } + LASSERT(id_fid(&body->id1) != 0); + id_assign_fid(&lli->lli_id, &body->id1); + + if ((body->valid & OBD_MD_FLID) || (body->valid & OBD_MD_FLGENER)) + id_assign_stc(&lli->lli_id, &body->id1); + if (body->valid & OBD_MD_FLID) - inode->i_ino = body->ino; + inode->i_ino = id_ino(&body->id1); + if (body->valid & OBD_MD_FLGENER) + inode->i_generation = id_gen(&body->id1); if (body->valid & OBD_MD_FLATIME) LTIME_S(inode->i_atime) = body->atime; if (body->valid & OBD_MD_FLMTIME && @@ -1075,10 +1244,14 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, if (body->valid & OBD_MD_FLCTIME && body->ctime > LTIME_S(inode->i_ctime)) LTIME_S(inode->i_ctime) = body->ctime; - if (body->valid & OBD_MD_FLMODE) - inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT); - if (body->valid & OBD_MD_FLTYPE) - inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT); + if (body->valid & OBD_MD_FLMODE) { + inode->i_mode = (inode->i_mode & S_IFMT) | + (body->mode & ~S_IFMT); + } + if (body->valid & OBD_MD_FLTYPE) { + inode->i_mode = (inode->i_mode & ~S_IFMT) | + (body->mode & S_IFMT); + } if (body->valid & OBD_MD_FLUID) inode->i_uid = body->uid; if (body->valid & OBD_MD_FLGID) @@ -1087,8 +1260,6 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, inode->i_flags = body->flags; if (body->valid & OBD_MD_FLNLINK) inode->i_nlink = body->nlink; - if (body->valid & OBD_MD_FLGENER) - inode->i_generation = body->generation; if (body->valid & OBD_MD_FLRDEV) #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) inode->i_rdev = body->rdev; @@ -1102,6 +1273,11 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, if (body->valid & OBD_MD_FLSIZE) set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + inode->i_dev = (kdev_t)id_group(&lli->lli_id); +#endif + LASSERT(id_fid(&lli->lli_id) != 0); } #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) @@ -1131,7 +1307,9 @@ void ll_read_inode2(struct inode *inode, void *opaque) LTIME_S(inode->i_mtime) = 0; LTIME_S(inode->i_atime) = 0; LTIME_S(inode->i_ctime) = 0; - ll_update_inode(inode, md->body, md->lsm); + + inode->i_rdev = 0; + ll_update_inode(inode, md); /* OIDEBUG(inode); */ @@ -1182,6 +1360,25 @@ void ll_read_inode2(struct inode *inode, void *opaque) } } +void ll_delete_inode(struct inode *inode) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_id id; + int rc; + ENTRY; + + ll_inode2id(&id, inode); + + rc = md_delete_inode(sbi->ll_md_exp, &id); + if (rc) { + CERROR("md_delete_inode() failed, error %d\n", + rc); + } + + clear_inode(inode); + EXIT; +} + int ll_iocontrol(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg) { @@ -1192,12 +1389,12 @@ int ll_iocontrol(struct inode *inode, struct file *file, switch(cmd) { case EXT3_IOC_GETFLAGS: { - struct ll_fid fid; - unsigned long valid = OBD_MD_FLFLAGS; + struct lustre_id id; + __u64 valid = OBD_MD_FLFLAGS; struct mds_body *body; - ll_inode2fid(&fid, inode); - rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, 0, &req); + ll_inode2id(&id, inode); + rc = md_getattr(sbi->ll_md_exp, &id, valid, 0, &req); if (rc) { CERROR("failure %d inode %lu\n", rc, inode->i_ino); RETURN(-abs(rc)); @@ -1217,7 +1414,7 @@ int ll_iocontrol(struct inode *inode, struct file *file, RETURN(put_user(flags, (int *)arg)); } case EXT3_IOC_SETFLAGS: { - struct mdc_op_data op_data; + struct mdc_op_data *op_data; struct iattr attr; struct obdo *oa; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; @@ -1229,32 +1426,39 @@ int ll_iocontrol(struct inode *inode, struct file *file, if (!oa) RETURN(-ENOMEM); - ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); + OBD_ALLOC(op_data, sizeof(*op_data)); + if (op_data == NULL) { + obdo_free(oa); + RETURN(-ENOMEM); + } + ll_prepare_mdc_data(op_data, inode, NULL, NULL, 0, 0); memset(&attr, 0x0, sizeof(attr)); attr.ia_attr_flags = flags; attr.ia_valid |= ATTR_ATTR_FLAG; - rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, - &attr, NULL, 0, NULL, 0, &req); + rc = md_setattr(sbi->ll_md_exp, op_data, + &attr, NULL, 0, NULL, 0, &req); + OBD_FREE(op_data, sizeof(*op_data)); if (rc) { ptlrpc_req_finished(req); if (rc != -EPERM && rc != -EACCES) - CERROR("mdc_setattr fails: rc = %d\n", rc); + CERROR("md_setattr fails: rc = %d\n", rc); obdo_free(oa); RETURN(rc); } ptlrpc_req_finished(req); oa->o_id = lsm->lsm_object_id; + oa->o_gr = lsm->lsm_object_gr; oa->o_flags = flags; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP; - rc = obd_setattr(sbi->ll_osc_exp, oa, lsm, NULL); + rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL); obdo_free(oa); if (rc) { if (rc != -EPERM && rc != -EACCES) - CERROR("mdc_setattr fails: rc = %d\n", rc); + CERROR("md_setattr fails: rc = %d\n", rc); RETURN(rc); } @@ -1280,38 +1484,42 @@ int ll_iocontrol(struct inode *inode, struct file *file, RETURN(0); } +/* this is only called in the case of forced umount. */ void ll_umount_begin(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); struct obd_device *obd; struct obd_ioctl_data ioc_data = { 0 }; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:\n"); - - obd = class_exp2obd(sbi->ll_mdc_exp); + + CDEBUG(D_VFSTRACE, "VFS Op: superblock %p count %d active %d\n", sb, + sb->s_count, atomic_read(&sb->s_active)); + + obd = class_exp2obd(sbi->ll_md_exp); if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", - sbi->ll_mdc_exp->exp_handle.h_cookie); + sbi->ll_md_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data, - &ioc_data, NULL); + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, + sizeof(ioc_data), &ioc_data, NULL); - obd = class_exp2obd(sbi->ll_osc_exp); + obd = class_exp2obd(sbi->ll_dt_exp); if (obd == NULL) { CERROR("Invalid LOV connection handle "LPX64"\n", - sbi->ll_osc_exp->exp_handle.h_cookie); + sbi->ll_dt_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_osc_exp, sizeof ioc_data, - &ioc_data, NULL); + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, + sizeof(ioc_data), &ioc_data, NULL); - /* Really, we'd like to wait until there are no requests outstanding, + /* + * really, we'd like to wait until there are no requests outstanding, * and then continue. For now, we just invalidate the requests, * schedule, and hope. */ @@ -1320,25 +1528,29 @@ void ll_umount_begin(struct super_block *sb) EXIT; } -int ll_prep_inode(struct obd_export *exp, struct inode **inode, - struct ptlrpc_request *req, int offset,struct super_block *sb) +int ll_prep_inode(struct obd_export *dt_exp, struct obd_export *md_exp, + struct inode **inode, struct ptlrpc_request *req, + int offset, struct super_block *sb) { struct lustre_md md; int rc = 0; - rc = mdc_req2lustre_md(req, offset, exp, &md); + rc = mdc_req2lustre_md(md_exp, req, offset, dt_exp, &md); if (rc) RETURN(rc); if (*inode) { - ll_update_inode(*inode, md.body, md.lsm); + ll_update_inode(*inode, &md); } else { LASSERT(sb); - *inode = ll_iget(sb, md.body->ino, &md); + *inode = ll_iget(sb, id_ino(&md.body->id1), &md); if (*inode == NULL || is_bad_inode(*inode)) { /* free the lsm if we allocated one above */ if (md.lsm != NULL) - obd_free_memmd(exp, &md.lsm); + obd_free_memmd(dt_exp, &md.lsm); + if (md.mea != NULL) + obd_free_memmd(md_exp, + (struct lov_stripe_md**)&md.mea); rc = -ENOMEM; CERROR("new_inode -fatal: rc %d\n", rc); } @@ -1346,3 +1558,28 @@ int ll_prep_inode(struct obd_export *exp, struct inode **inode, RETURN(rc); } + +int ll_get_fid(struct obd_export *exp, struct lustre_id *idp, + char *filename, struct lustre_id *ret) +{ + struct ptlrpc_request *request = NULL; + struct mds_body *body; + int rc; + + rc = md_getattr_lock(exp, idp, filename, strlen(filename) + 1, + 0, 0, &request); + if (rc < 0) { + CDEBUG(D_INFO, "md_getattr_lock failed on %s: rc %d\n", + filename, rc); + return rc; + } + + body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); + LASSERT(body != NULL); + LASSERT_REPSWABBED(request, 0); + + *ret = body->id1; + ptlrpc_req_finished(request); + + return rc; +}