From c7c974b3664a811d26a970d74aec595015f19762 Mon Sep 17 00:00:00 2001 From: adilger Date: Sat, 14 Feb 2004 01:31:05 +0000 Subject: [PATCH] Update b_eq from HEAD (20040213_1644) (b_bug974 landing). Eric Mei, per your comment in osc_announce_cached(), we should only be sending OBD_MD_FLGRANT if the client is caching data. If (AFAIK) liblustre isn't doing any client-side write cache then it shouldn't be requesting any grant from the OST. This isn't critical, since the OST _should_ stop giving out grant when the client isn't consuming it, but it would be good to verify. --- lustre/llite/llite_lib.c | 54 +++--- lustre/lov/lov_internal.h | 2 + lustre/obdfilter/filter_internal.h | 11 +- lustre/obdfilter/filter_io.c | 360 +++++++++++++++++++++++++++++++++++-- 4 files changed, 388 insertions(+), 39 deletions(-) diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 49ddb12..e5801a0 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -43,7 +43,7 @@ extern struct super_operations ll_super_operations; #define log2(n) ffz(~(n)) #endif -struct ll_sb_info *lustre_init_sbi(struct super_block *sb) +struct ll_sb_info *lustre_init_sbi(struct super_block *sb) { struct ll_sb_info *sbi = NULL; class_uuid_t uuid; @@ -65,7 +65,7 @@ struct ll_sb_info *lustre_init_sbi(struct super_block *sb) RETURN(sbi); } -void lustre_free_sbi(struct super_block *sb) +void lustre_free_sbi(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); ENTRY; @@ -126,8 +126,8 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) sb->s_blocksize_bits = log2(osfs.os_bsize); sb->s_magic = LL_SUPER_MAGIC; sb->s_maxbytes = PAGE_CACHE_MAXBYTES; - - devno = get_uuid2int(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid, + + devno = get_uuid2int(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid, strlen(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid)); sb->s_dev = devno; @@ -159,7 +159,7 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) sb->s_op = &lustre_super_operations; - /* make root inode + /* make root inode * XXX: move this to after cbd setup? */ err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); @@ -436,14 +436,14 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, err = class_process_config(&lcfg); if (err < 0) GOTO(out_detach, err); - + obd = class_name2obd(name); if (obd == NULL) GOTO(out_cleanup, err = -EINVAL); /* Disable initial recovery on this import */ - err = obd_set_info(obd->obd_self_export, - strlen("initial_recov"), "initial_recov", + err = obd_set_info(obd->obd_self_export, + strlen("initial_recov"), "initial_recov", sizeof(allow_recov), &allow_recov); if (err) GOTO(out_cleanup, err); @@ -453,9 +453,9 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, err); GOTO(out_cleanup, err); } - + exp = class_conn2export(&mdc_conn); - + ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); rc = class_config_parse_llog(ctxt, profile, cfg); if (rc) { @@ -496,7 +496,7 @@ out_del_conn: out: if (rc == 0) rc = err; - + RETURN(rc); } @@ -527,15 +527,15 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) } OBD_ALLOC(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); - if (sbi->ll_lmd == NULL) + if (sbi->ll_lmd == NULL) GOTO(out_free, err = -ENOMEM); memcpy(sbi->ll_lmd, lmd, sizeof(*lmd)); /* generate a string unique to this super, let's try the address of the super itself.*/ - len = (sizeof(sb) * 2) + 1; + len = (sizeof(sb) * 2) + 1; OBD_ALLOC(sbi->ll_instance, len); - if (sbi->ll_instance == NULL) + if (sbi->ll_instance == NULL) GOTO(out_free, err = -ENOMEM); sprintf(sbi->ll_instance, "%p", sb); @@ -556,13 +556,13 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) } if (osc) OBD_FREE(osc, strlen(osc) + 1); - OBD_ALLOC(osc, strlen(lprof->lp_osc) + + OBD_ALLOC(osc, strlen(lprof->lp_osc) + strlen(sbi->ll_instance) + 2); sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance); if (mdc) OBD_FREE(mdc, strlen(mdc) + 1); - OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + + OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + strlen(sbi->ll_instance) + 2); sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance); } @@ -576,9 +576,9 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) CERROR("no mdc\n"); GOTO(out_free, err = -EINVAL); } - + err = lustre_common_fill_super(sb, mdc, osc); - + if (err) GOTO(out_free, err); @@ -605,9 +605,9 @@ out_free: OBD_ALLOC(cln_prof, len); sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile); - err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, + err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, 0); - if (err < 0) + if (err < 0) CERROR("Unable to process log: %s\n", cln_prof); OBD_FREE(cln_prof, len); OBD_FREE(sbi->ll_instance, strlen(sbi->ll_instance)+ 1); @@ -619,11 +619,11 @@ out_free: goto out_dev; } /* lustre_fill_super */ -static void lustre_manual_cleanup(struct ll_sb_info *sbi) +static void lustre_manual_cleanup(struct ll_sb_info *sbi) { struct lustre_cfg lcfg; struct obd_device *obd; - int next = 0; + int next = 0; while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL) { @@ -644,7 +644,7 @@ static void lustre_manual_cleanup(struct ll_sb_info *sbi) } } - if (sbi->ll_lmd != NULL) + if (sbi->ll_lmd != NULL) class_del_profile(sbi->ll_lmd->lmd_profile); } @@ -660,7 +660,7 @@ void lustre_put_super(struct super_block *sb) if (obd) force_umount = obd->obd_no_recov; obd = NULL; - + lustre_common_put_super(sb); if (sbi->ll_lmd != NULL) { @@ -860,7 +860,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* from sys_utime() */ if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { if (current->fsuid != inode->i_uid && - (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0) + (rc=ll_permission(inode,MAY_WRITE,NULL))!=0) RETURN(rc); } else { /* from inode_change_ok() */ @@ -878,7 +878,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * If we don't we can race with other i_size updaters on our node, like * ll_file_read. We can also race with i_size propogation to other * nodes through dirtying and writeback of final cached pages. This - * last one is especially bad for racing o_append users on other + * last one is especially bad for racing o_append users on other * nodes. */ if (ia_valid & ATTR_SIZE) { struct ldlm_extent extent = { .start = attr->ia_size, @@ -951,6 +951,8 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, RETURN(rc); } + osfs->os_type = sb->s_magic; + CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 6c26a16..a565f51 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -41,6 +41,8 @@ int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_bytes); int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_user_md *lump); +int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_user_md *lump); int lov_getstripe(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_user_md *lump); diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index ce7b4a3..06d852c 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -48,6 +48,8 @@ #define FILTER_INCOMPAT_GROUPS 0x00000001 #define FILTER_INCOMPAT_SUPP (FILTER_INCOMPAT_GROUPS) +#define FILTER_GRANT_CHUNK (2ULL*1024*1024) + /* Data stored per server at the head of the last_rcvd file. In le32 order. * Try to keep this the same as mds_server_data so we might one day merge. */ struct filter_server_data { @@ -128,15 +130,20 @@ void flip_into_page_cache(struct inode *inode, struct page *new_page); int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti); +obd_size filter_grant_space_left(struct obd_export *exp); +long filter_grant(struct obd_export *exp, obd_size current_grant, + obd_size want, obd_size fs_space_left); +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res); /* filter_log.c */ struct ost_filterdata { __u32 ofd_epoch; }; -int filter_log_sz_change(struct llog_handle *cathandle, +int filter_log_sz_change(struct llog_handle *cathandle, struct ll_fid *mds_fid, __u32 io_epoch, - struct llog_cookie *logcookie, + struct llog_cookie *logcookie, struct inode *inode); //int filter_get_catalog(struct obd_device *); void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index f4581bb..c2867b5 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -99,12 +99,171 @@ err_page: return lnb->rc; } +/* Grab the dirty and seen grant announcements from the incoming obdo. + * We will later calculate the clients new grant and return it. + * Caller must hold osfs lock */ +static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) +{ + struct filter_export_data *fed; + struct obd_device *obd = exp->exp_obd; + ENTRY; + + if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) != + (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) { + oa->o_valid &= ~OBD_MD_FLGRANT; + EXIT; + return; + } + + fed = &exp->exp_filter_data; + + /* Add some margin, since there is a small race if other RPCs arrive + * out-or-order and have already consumed some grant. We want to + * leave this here in case there is a large error in accounting. */ + CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? + D_ERROR : D_CACHE, + "%s: cli %s reports granted: "LPU64" dropped: %u, local: %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, oa->o_grant, + oa->o_dropped, fed->fed_grant); + + /* Update our accounting now so that statfs takes it into account. + * Note that fed_dirty is only approximate and can become incorrect + * if RPCs arrive out-of-order. No important calculations depend + * on fed_dirty however. */ + obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty; + if (fed->fed_grant < oa->o_dropped) { + CERROR("%s: cli %s reports %u dropped > fed_grant %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, + oa->o_dropped, fed->fed_grant); + oa->o_dropped = 0; + } + if (obd->u.filter.fo_tot_granted < oa->o_dropped) { + CERROR("%s: cli %s reports %u dropped > tot_granted "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + oa->o_dropped, obd->u.filter.fo_tot_granted); + oa->o_dropped = 0; + } + obd->u.filter.fo_tot_granted -= oa->o_dropped; + fed->fed_grant -= oa->o_dropped; + fed->fed_dirty = oa->o_dirty; + EXIT; +} + +#define GRANT_FOR_LLOG 16 + +/* Figure out how much space is available between what we've granted + * and what remains in the filesystem. Compensate for ext3 indirect + * block overhead when computing how much free space is left ungranted. + * + * Caller must hold obd_osfs_lock. */ +obd_size filter_grant_space_left(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0; + int rc, statfs_done = 0; + + if (time_before(obd->obd_osfs_age, jiffies - HZ)) { +restat: + rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1); + if (rc) /* N.B. statfs can't really fail */ + RETURN(0); + statfs_done = 1; + } + + avail = obd->obd_osfs.os_bavail; + left = avail - (avail >> (blockbits - 3)); /* (d)indirect */ + if (left > GRANT_FOR_LLOG) { + left = (left - GRANT_FOR_LLOG) << blockbits; + } else { + left = 0 /* << blockbits */; + } + + if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) { + CDEBUG(D_CACHE, "fs has no space left and statfs too old\n"); + goto restat; + } + + if (left >= tot_granted) { + left -= tot_granted; + } else { + static unsigned long next; + if (left < tot_granted - obd->u.filter.fo_tot_pending && + time_after(jiffies, next)) { + spin_unlock(&obd->obd_osfs_lock); + CERROR("%s: cli %s granted "LPU64" more than available " + LPU64" and pending "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, tot_granted, left, + obd->u.filter.fo_tot_pending); + if (next == 0) + portals_debug_dumplog(); + next = jiffies + 20 * HZ; + spin_lock(&obd->obd_osfs_lock); + } + left = 0; + } + + CDEBUG(D_CACHE, "%s: cli %s free: "LPU64" avail: "LPU64" grant "LPU64 + " left: "LPU64" pending: "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, obd->obd_osfs.os_bfree << blockbits, + avail << blockbits, tot_granted, left, + obd->u.filter.fo_tot_pending); + + return left; +} + +/* Calculate how much grant space to allocate to this client, based on how + * much space is currently free and how much of that is already granted. + * + * Caller must hold obd_osfs_lock. */ +long filter_grant(struct obd_export *exp, obd_size current_grant, + obd_size want, obd_size fs_space_left) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_export_data *fed = &exp->exp_filter_data; + int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + __u64 grant = 0; + + /* Grant some fraction of the client's requested grant space so that + * they are not always waiting for write credits (not all of it to + * avoid overgranting in face of multiple RPCs in flight). This + * essentially will be able to control the OSC_MAX_RIF for a client. + * + * If we do have a large disparity and multiple RPCs in flight we + * might grant "too much" but that's OK because it means we are + * dirtying a lot on the client and will likely use it up quickly. */ + if (current_grant < want) { + grant = min((want >> blockbits) / 2, + (fs_space_left >> blockbits) / 8); + grant <<= blockbits; + + if (grant) { + if (grant > FILTER_GRANT_CHUNK) + grant = FILTER_GRANT_CHUNK; + + obd->u.filter.fo_tot_granted += grant; + fed->fed_grant += grant; + } + } + + CDEBUG(D_CACHE,"%s: cli %s wants: "LPU64" granting: "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, want, grant); + CDEBUG(D_CACHE, + "%s: cli %s tot cached:"LPU64" granted:"LPU64 + " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid, + obd->u.filter.fo_tot_dirty, + obd->u.filter.fo_tot_granted, obd->obd_num_exports); + + return grant; +} + static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, struct niobuf_local *res, struct obd_trans_info *oti) { + struct obd_device *obd = exp->exp_obd; struct obd_run_ctxt saved; struct obd_ioobj *o; struct niobuf_remote *rnb; @@ -119,6 +278,21 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, /* We are currently not supporting multi-obj BRW_READ RPCS at all. * When we do this function's dentry cleanup will need to be fixed */ LASSERT(objcount == 1); + LASSERT(obj->ioo_bufcnt > 0); + + if (oa && oa->o_valid & OBD_MD_FLGRANT) { + spin_lock(&obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + +#if 0 + /* Reads do not increase grants */ + oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty, + filter_grant_space_left(exp)); +#else + oa->o_grant = 0; +#endif + spin_unlock(&obd->obd_osfs_lock); + } OBD_ALLOC(fso, objcount * sizeof(*fso)); if (fso == NULL) @@ -130,7 +304,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, for (i = 0, o = obj; i < objcount; i++, o++) { LASSERT(o->ioo_bufcnt); - dentry = filter_oa2dentry(exp->exp_obd, oa); + dentry = filter_oa2dentry(obd, oa); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); @@ -160,7 +334,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; - lnb->start = jiffies; if (inode->i_size <= rnb->offset) { /* If there's no more data, abort early. @@ -195,8 +368,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", (jiffies - now)); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES, - tot_bytes); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); while (lnb-- > res) { rc = filter_finish_page_read(lnb); if (rc) { @@ -235,6 +407,111 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, return rc; } +/* When clients have dirtied as much space as they've been granted they + * fall through to sync writes. These sync writes haven't been expressed + * in grants and need to error with ENOSPC when there isn't room in the + * filesystem for them after grants are taken into account. However, + * writeback of the dirty data that was already granted space can write + * right on through. + * + * Caller must hold obd_osfs_lock. */ +static int filter_grant_check(struct obd_export *exp, int objcount, + struct fsfilt_objinfo *fso, int niocount, + struct niobuf_remote *rnb, + struct niobuf_local *lnb, obd_size *left, + struct inode *inode) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize; + unsigned long used = 0, ungranted = 0, using; + int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE; + + for (obj = 0; obj < objcount; obj++) { + for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) { + int tmp, bytes; + + /* FIXME: this is calculated with PAGE_SIZE on client */ + bytes = rnb[n].len; + bytes += rnb[n].offset & (blocksize - 1); + tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1); + if (tmp) + bytes += blocksize - tmp; + + if (rnb[n].flags & OBD_BRW_FROM_GRANT) { + if (fed->fed_grant < used + bytes) { + CDEBUG(D_CACHE, + "%s: cli %s claims %ld+%d GRANT," + " no such grant %lu, idx %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, + used, bytes, fed->fed_grant, n); + mask = D_ERROR; + } else { + used += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; + CDEBUG(0, "idx %d used=%lu\n", n, used); + rc = 0; + continue; + } + } + if (*left > ungranted) { + /* if enough space, pretend it was granted */ + ungranted += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted); + rc = 0; + continue; + } + + /* We can't check for already-mapped blocks here, as + * it requires dropping the osfs lock to do the bmap. + * Instead, we return ENOSPC and in that case we need + * to go through and verify if all of the blocks not + * marked BRW_GRANTED are already mapped and we can + * ignore this error. */ + lnb[n].rc = -ENOSPC; + rnb[n].flags &= OBD_BRW_GRANTED; + CDEBUG(D_CACHE, "%s: cli %s idx %d no space for %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, n, bytes); + } + } + + /* Now substract what client have used already. We don't subtract + * this from the tot_granted yet, so that other client's can't grab + * that space before we have actually allocated our blocks. That + * happens in filter_grant_commit() after the writes are done. */ + *left -= ungranted; + fed->fed_grant -= used; + fed->fed_pending += used; + exp->exp_obd->u.filter.fo_tot_pending += used; + + CDEBUG(mask, + "%s: cli %s used: %lu ungranted: %lu grant: %lu dirty: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, used, + ungranted, fed->fed_grant, fed->fed_dirty); + + /* Rough calc in case we don't refresh cached statfs data */ + using = (used + ungranted + 1 ) >> + exp->exp_obd->u.filter.fo_sb->s_blocksize_bits; + if (exp->exp_obd->obd_osfs.os_bavail > using) + exp->exp_obd->obd_osfs.os_bavail -= using; + else + exp->exp_obd->obd_osfs.os_bavail = 0; + + if (fed->fed_dirty < used) { + CERROR("%s: cli %s claims used %lu > fed_dirty %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + used, fed->fed_dirty); + used = fed->fed_dirty; + } + exp->exp_obd->u.filter.fo_tot_dirty -= used; + fed->fed_dirty -= used; + + return rc; +} + static int filter_start_page_write(struct inode *inode, struct niobuf_local *lnb) { @@ -272,11 +549,12 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, { struct obd_run_ctxt saved; struct niobuf_remote *rnb; - struct niobuf_local *lnb = NULL; + struct niobuf_local *lnb; struct fsfilt_objinfo fso; struct dentry *dentry; - int rc = 0, i, tot_bytes = 0; + obd_size left; unsigned long now = jiffies; + int rc = 0, i, tot_bytes = 0, cleanup_phase = 1; ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); @@ -305,25 +583,47 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", (jiffies - now)); + spin_lock(&exp->exp_obd->obd_osfs_lock); + if (oa) + filter_grant_incoming(exp, oa); + cleanup_phase = 0; + + left = filter_grant_space_left(exp); + + rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res, + &left, dentry->d_inode); + if (oa && oa->o_valid & OBD_MD_FLGRANT) + oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); + + spin_unlock(&exp->exp_obd->obd_osfs_lock); + + if (rc) { + f_dput(dentry); + GOTO(cleanup, rc); + } + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, lnb++, rnb++) { + /* We still set up for ungranted pages so that granted pages + * can be written to disk as they were promised, and portals + * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; - lnb->start = jiffies; rc = filter_start_page_write(dentry->d_inode, lnb); if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@" - LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, + CDEBUG(D_ERROR, "page err %u@"LPU64" %u/%u %p: rc %d\n", + lnb->len, lnb->offset, i, obj->ioo_bufcnt, dentry, rc); while (lnb-- > res) __free_pages(lnb->page, 0); f_dput(dentry); GOTO(cleanup, rc); } - tot_bytes += lnb->len; + if (lnb->rc == 0) + tot_bytes += lnb->len; } if (time_after(jiffies, now + 15 * HZ)) @@ -336,6 +636,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, tot_bytes); EXIT; cleanup: + switch(cleanup_phase) { + case 1: + spin_lock(&exp->exp_obd->obd_osfs_lock); + if (oa) + filter_grant_incoming(exp, oa); + spin_unlock(&exp->exp_obd->obd_osfs_lock); + default: ; + } pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); return rc; } @@ -432,7 +740,37 @@ void flip_into_page_cache(struct inode *inode, struct page *new_page) } while (rc != 0); } -/* XXX needs to trickle its oa down */ +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res) +{ + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct niobuf_local *lnb = res; + unsigned long pending = 0; + int i; + + spin_lock(&exp->exp_obd->obd_osfs_lock); + for (i = 0, lnb = res; i < niocount; i++, lnb++) + pending += lnb->lnb_grant_used; + + LASSERTF(exp->exp_filter_data.fed_pending >= pending, + "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + exp->exp_filter_data.fed_pending, pending); + exp->exp_filter_data.fed_pending -= pending; + LASSERTF(filter->fo_tot_granted >= pending, + "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + exp->exp_obd->u.filter.fo_tot_granted, pending); + filter->fo_tot_granted -= pending; + LASSERTF(filter->fo_tot_pending >= pending, + "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + filter->fo_tot_pending, pending); + filter->fo_tot_pending -= pending; + + spin_unlock(&exp->exp_obd->obd_osfs_lock); +} + int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti) -- 1.8.3.1