X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter_io.c;h=41659a442ffd820317479ee6121a7771398a989a;hb=1ad4ea7c1f88ac37b847942b4f39e51b11d76443;hp=3900ad15bc8b668f92f4490b98c97d30b1393099;hpb=3de901fceee79de12a31428bcc6ba3a00f10d1fe;p=fs%2Flustre-release.git diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 3900ad1..41659a4 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -33,70 +33,53 @@ #include #include +#include +#include #include "filter_internal.h" -static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb) -{ - struct address_space *mapping = inode->i_mapping; - struct page *page; - unsigned long index = lnb->offset >> PAGE_SHIFT; - int rc; - - page = grab_cache_page(mapping, index); /* locked page */ - if (page == NULL) - return lnb->rc = -ENOMEM; +int *obdfilter_created_scratchpad; - LASSERT(page->mapping == mapping); +static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode, + struct niobuf_local *lnb) - lnb->page = page; - - if (inode->i_size < lnb->offset + lnb->len - 1) - lnb->rc = inode->i_size - lnb->offset; - else - lnb->rc = lnb->len; - - if (PageUptodate(page)) { - unlock_page(page); - return 0; +{ + struct page *page; + ENTRY; + + page = alloc_pages(GFP_HIGHUSER, 0); + if (page == NULL) { + CERROR("no memory for a temp page\n"); + lnb->rc = -ENOMEM; + RETURN(-ENOMEM); } - rc = mapping->a_ops->readpage(NULL, page); - if (rc < 0) { - CERROR("page index %lu, rc = %d\n", index, rc); - lnb->page = NULL; - page_cache_release(page); - return lnb->rc = rc; +#if 0 + POISON_PAGE(page, 0xf1); + if (lnb->len != PAGE_SIZE) { + memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len); + kunmap(page); } +#endif + page->index = lnb->offset >> PAGE_SHIFT; - return 0; + lnb->page = page; + + RETURN(0); } -static int filter_finish_page_read(struct niobuf_local *lnb) +void filter_free_dio_pages(int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf_local *res) { - if (lnb->page == NULL) - return 0; - - if (PageUptodate(lnb->page)) - return 0; + int i, j; - wait_on_page(lnb->page); - if (!PageUptodate(lnb->page)) { - CERROR("page index %lu/offset "LPX64" not uptodate\n", - lnb->page->index, lnb->offset); - GOTO(err_page, lnb->rc = -EIO); - } - if (PageError(lnb->page)) { - CERROR("page index %lu/offset "LPX64" has error\n", - lnb->page->index, lnb->offset); - GOTO(err_page, lnb->rc = -EIO); + for (i = 0; i < objcount; i++, obj++) { + for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) { + if (res->page != NULL) { + __free_page(res->page); + res->page = NULL; + } + } } - - return 0; - -err_page: - page_cache_release(lnb->page); - lnb->page = NULL; - return lnb->rc; } /* Grab the dirty and seen grant announcements from the incoming obdo. @@ -106,6 +89,9 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) { struct filter_export_data *fed; struct obd_device *obd = exp->exp_obd; + static unsigned long last_msg; + static int last_count; + int mask = D_CACHE; ENTRY; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); @@ -119,11 +105,20 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) fed = &exp->exp_filter_data; + /* Don't print this to the console the first time it happens, since + * it can happen legitimately on occasion, but only rarely. */ + if (time_after(jiffies, last_msg + 60 * HZ)) { + last_count = 0; + last_msg = jiffies; + } + if ((last_count & (-last_count)) == last_count) + mask = D_WARNING; + last_count++; + /* Add some margin, since there is a small race if other RPCs arrive * out-or-order and have already consumed some grant. We want to * leave this here in case there is a large error in accounting. */ - CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? - D_WARNING : D_CACHE, + CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE, "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant, oa->o_dropped, fed->fed_grant); @@ -272,146 +267,110 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; - struct obd_run_ctxt saved; - struct obd_ioobj *o; + struct lvfs_run_ctxt saved; struct niobuf_remote *rnb; - struct niobuf_local *lnb = NULL; - struct fsfilt_objinfo *fso; - struct dentry *dentry; + struct niobuf_local *lnb; + struct dentry *dentry = NULL; struct inode *inode; - int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0; + void *iobuf = NULL; + int rc = 0, i, tot_bytes = 0; unsigned long now = jiffies; ENTRY; /* We are currently not supporting multi-obj BRW_READ RPCS at all. * When we do this function's dentry cleanup will need to be fixed */ - LASSERT(objcount == 1); - LASSERT(obj->ioo_bufcnt > 0); + LASSERTF(objcount == 1, "%d\n", objcount); + LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt); if (oa && oa->o_valid & OBD_MD_FLGRANT) { spin_lock(&obd->obd_osfs_lock); filter_grant_incoming(exp, oa); -#if 0 - /* Reads do not increase grants */ - oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty, - filter_grant_space_left(exp)); -#else oa->o_grant = 0; -#endif spin_unlock(&obd->obd_osfs_lock); } - OBD_ALLOC(fso, objcount * sizeof(*fso)); - if (fso == NULL) - RETURN(-ENOMEM); - memset(res, 0, niocount * sizeof(*res)); - push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - for (i = 0, o = obj; i < objcount; i++, o++) { - LASSERT(o->ioo_bufcnt); - - dentry = filter_oa2dentry(obd, oa); - if (IS_ERR(dentry)) - GOTO(cleanup, rc = PTR_ERR(dentry)); + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf); + if (rc) + GOTO(cleanup, rc); - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - o->ioo_id); - f_dput(dentry); - GOTO(cleanup, rc = -ENOENT); - } + dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id); + if (IS_ERR(dentry)) + GOTO(cleanup, rc = PTR_ERR(dentry)); - fso[i].fso_dentry = dentry; - fso[i].fso_bufcnt = o->ioo_bufcnt; - } + inode = dentry->d_inode; - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n", - (jiffies - now)); - - for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) { - dentry = fso[i].fso_dentry; - inode = dentry->d_inode; - - for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) { - lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - - if (inode->i_size <= rnb->offset) { - /* If there's no more data, abort early. - * lnb->page == NULL and lnb->rc == 0, so it's - * easy to detect later. */ - break; - } else { - rc = filter_start_page_read(inode, lnb); - } + fsfilt_check_slow(now, obd_timeout, "preprw_read setup"); - if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, - "page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, j, o->ioo_bufcnt, - dentry, rc); - cleanup_phase = 1; - GOTO(cleanup, rc); - } + for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt; + i++, rnb++, lnb++) { + lnb->dentry = dentry; + lnb->offset = rnb->offset; + lnb->len = rnb->len; + lnb->flags = rnb->flags; - tot_bytes += lnb->rc; - if (lnb->rc < lnb->len) { - /* short read, be sure to wait on it */ - lnb++; - break; - } + if ((inode && inode->i_size <= rnb->offset) || inode == NULL) + /* + * if there's no more data, abort early. lnb->page == * + * NULL and lnb->rc == 0, so it's easy to detect later. + */ + break; + + rc = filter_alloc_dio_page(obd, inode, lnb); + if (rc) { + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "page err %u@"LPU64" %u/%u %p: rc %d\n", + lnb->len, lnb->offset, i, obj->ioo_bufcnt, + dentry, rc); + GOTO(cleanup, rc); } + + if (inode->i_size < lnb->offset + lnb->len - 1) + lnb->rc = inode->i_size - lnb->offset; + else + lnb->rc = lnb->len; + + tot_bytes += lnb->rc; + + filter_iobuf_add_page(obd, iobuf, inode, lnb->page); } - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", - (jiffies - now)); + fsfilt_check_slow(now, obd_timeout, "start_page_read"); - lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); - while (lnb-- > res) { - rc = filter_finish_page_read(lnb); - if (rc) { - CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len, - lnb->offset, (int)(lnb - res), lnb->dentry, rc); - cleanup_phase = 1; + if (inode != NULL) { + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, + exp, NULL, NULL, NULL); + if (rc) GOTO(cleanup, rc); - } } - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n", - (jiffies - now)); - + lprocfs_counter_add(obd->obd_stats, + LPROC_FILTER_READ_BYTES, tot_bytes); filter_tally_read(&exp->exp_obd->u.filter, res, niocount); EXIT; - - cleanup: - switch (cleanup_phase) { - case 1: - for (lnb = res; lnb < (res + niocount); lnb++) { - if (lnb->page) - page_cache_release(lnb->page); - } - if (res->dentry != NULL) - f_dput(res->dentry); - else - CERROR("NULL dentry in cleanup -- tell CFS\n"); - case 0: - OBD_FREE(fso, objcount * sizeof(*fso)); - pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); +cleanup: + if (rc) { + filter_free_dio_pages(objcount, obj, + niocount, res); + /* + * in other cases (no errors) dentry is released in + * filter_commitrw_read(). + */ + f_dput(dentry); } + + if (iobuf != NULL) + filter_free_iobuf(iobuf); + + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + + if (rc) + CERROR("io error %d\n", rc); + return rc; } @@ -522,25 +481,6 @@ static int filter_grant_check(struct obd_export *exp, int objcount, return rc; } -static int filter_start_page_write(struct inode *inode, - struct niobuf_local *lnb) -{ - struct page *page = alloc_pages(GFP_HIGHUSER, 0); - if (page == NULL) { - CERROR("no memory for a temp page\n"); - RETURN(lnb->rc = -ENOMEM); - } - POISON_PAGE(page, 0xf1); - if (lnb->len != PAGE_SIZE) { - memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len); - kunmap(page); - } - page->index = lnb->offset >> PAGE_SHIFT; - lnb->page = page; - - return 0; -} - /* If we ever start to support multi-object BRW RPCs, we will need to get locks * on mulitple inodes. That isn't all, because there still exists the * possibility of a truncate starting a new transaction while holding the ext3 @@ -557,46 +497,53 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, struct niobuf_local *res, struct obd_trans_info *oti) { - struct obd_run_ctxt saved; + int rc = 0, i, tot_bytes = 0, cleanup_phase = 0; + struct niobuf_local *lnb = res; + struct dentry *dentry = NULL; + unsigned long now = jiffies; + struct lvfs_run_ctxt saved; struct niobuf_remote *rnb; - struct niobuf_local *lnb; struct fsfilt_objinfo fso; - struct dentry *dentry; + struct obd_device *obd; obd_size left; - unsigned long now = jiffies; - int rc = 0, i, tot_bytes = 0, cleanup_phase = 1; + obd_uid uid; + obd_gid gid; + void *iobuf; + ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); memset(res, 0, niocount * sizeof(*res)); - push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, - obj->ioo_id); + rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf); + if (rc) + GOTO(cleanup, rc); + cleanup_phase = 1; + + obd = exp->exp_obd; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + uid = oa->o_valid & OBD_MD_FLUID ? oa->o_uid : 0; + gid = oa->o_valid & OBD_MD_FLGID ? oa->o_gid : 0; + + /* make sure that object is already allocated */ + dentry = filter_crow_object(obd, oa); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - f_dput(dentry); - GOTO(cleanup, rc = -ENOENT); - } + cleanup_phase = 2; fso.fso_dentry = dentry; fso.fso_bufcnt = obj->ioo_bufcnt; - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", - (jiffies - now)); + fsfilt_check_slow(now, obd_timeout, "preprw_write setup"); - spin_lock(&exp->exp_obd->obd_osfs_lock); + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); - cleanup_phase = 0; + + cleanup_phase = 3; left = filter_grant_space_left(exp); @@ -605,64 +552,99 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, if (oa && oa->o_valid & OBD_MD_FLGRANT) oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); - spin_unlock(&exp->exp_obd->obd_osfs_lock); + /* We're finishing using body->oa as an input variable, so reset + * o_valid here. */ + oa->o_valid = 0; - if (rc) { - f_dput(dentry); + spin_unlock(&obd->obd_osfs_lock); + + if (rc) GOTO(cleanup, rc); - } for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, lnb++, rnb++) { /* We still set up for ungranted pages so that granted pages * can be written to disk as they were promised, and portals - * needs to keep the pages all aligned properly. */ + * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; - rc = filter_start_page_write(dentry->d_inode, lnb); + rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb); if (rc) { CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, i, obj->ioo_bufcnt, dentry, rc); - while (lnb-- > res) - __free_pages(lnb->page, 0); - f_dput(dentry); GOTO(cleanup, rc); } + cleanup_phase = 4; + + /* If the filter writes a partial page, then has the file + * extended, the client will read in the whole page. the + * filter has to be careful to zero the rest of the partial + * page on disk. we do it by hand for partial extending + * writes, send_bio() is responsible for zeroing pages when + * asked to read unmapped blocks -- brw_kiovec() does this. */ + if (lnb->len != PAGE_SIZE) { + if (lnb->offset + lnb->len < dentry->d_inode->i_size) { + filter_iobuf_add_page(obd, iobuf, dentry->d_inode, + lnb->page); + } else { + memset(kmap(lnb->page) + lnb->len, 0, + PAGE_SIZE - lnb->len); + kunmap(lnb->page); + } + } if (lnb->rc == 0) tot_bytes += lnb->len; } - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_write: %lu jiffies\n", - (jiffies - now)); + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, + NULL, NULL, NULL); + + fsfilt_check_slow(now, obd_timeout, "start_page_write"); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes); EXIT; cleanup: switch(cleanup_phase) { + case 4: + if (rc) + filter_free_dio_pages(objcount, obj, niocount, res); + case 3: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + filter_free_iobuf(iobuf); + case 2: + if (rc && dentry && !IS_ERR(dentry)) + f_dput(dentry); + break; case 1: - spin_lock(&exp->exp_obd->obd_osfs_lock); + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); - spin_unlock(&exp->exp_obd->obd_osfs_lock); - default: ; + spin_unlock(&obd->obd_osfs_lock); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + filter_free_iobuf(iobuf); + break; + default:; + } - pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - return rc; + RETURN(rc); } int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, struct niobuf_local *res, - struct obd_trans_info *oti) + struct obd_trans_info *oti, struct lustre_capa *capa) { + int rc; + + rc = filter_verify_capa(cmd, exp, capa); + if (rc) + return rc; + if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, niocount, nb, res, oti); @@ -675,34 +657,37 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, return -EPROTO; } +void filter_release_read_page(struct filter_obd *filter, struct inode *inode, + struct page *page) +{ + int drop = 0; + + if (inode != NULL && + (inode->i_size > filter->fo_readcache_max_filesize)) + drop = 1; + + /* drop from cache like truncate_list_pages() */ + if (drop && !TryLockPage(page)) { + if (page->mapping) + ll_truncate_complete_page(page); + unlock_page(page); + } + page_cache_release(page); +} + static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { - struct obd_ioobj *o; - struct niobuf_local *lnb; - int i, j, drop = 0; + struct inode *inode = NULL; ENTRY; if (res->dentry != NULL) - drop = (res->dentry->d_inode->i_size > - exp->exp_obd->u.filter.fo_readcache_max_filesize); - - for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) { - for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { - if (lnb->page == NULL) - continue; - /* drop from cache like truncate_list_pages() */ - if (drop && !TryLockPage(lnb->page)) { - if (lnb->page->mapping) - ll_truncate_complete_page(lnb->page); - unlock_page(lnb->page); - } - page_cache_release(lnb->page); - } - } + inode = res->dentry->d_inode; + filter_free_dio_pages(objcount, obj, niocount, res); + if (res->dentry != NULL) f_dput(res->dentry); RETURN(rc); @@ -777,19 +762,156 @@ void filter_grant_commit(struct obd_export *exp, int niocount, spin_unlock(&exp->exp_obd->obd_osfs_lock); } +int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj, + int nioo, struct niobuf_remote *rnb) +{ + struct dentry *dentry; + struct lvfs_run_ctxt saved; + struct write_extents *extents = NULL; + int j, rc = 0, numexts = 0, flags = 0; + + ENTRY; + + LASSERT(nioo == 1); + + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + + dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr, + obj->ioo_id); + if (IS_ERR(dentry)) { + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + RETURN (PTR_ERR(dentry)); + } + + if (dentry->d_inode == NULL) { + CERROR("trying to write extents to non-existent file "LPU64"\n", + obj->ioo_id); + GOTO(cleanup, rc = -ENOENT); + } + + flags = fsfilt_get_fs_flags(exp->exp_obd, dentry); + if (!(flags & SM_DO_COW)) { + GOTO(cleanup, rc); + } + OBD_ALLOC(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); + if (!extents) { + CERROR("No Memory\n"); + GOTO(cleanup, rc = -ENOMEM); + } + for (j = 0; j < obj->ioo_bufcnt; j++) { + if (rnb[j].len != 0) { + extents[numexts].w_count = rnb[j].len; + extents[numexts].w_pos = rnb[j].offset; + numexts++; + } + } + rc = fsfilt_do_write_cow(exp->exp_obd, dentry, extents, numexts); + if (rc) { + CERROR("Do cow error id "LPU64" rc:%d \n", + obj->ioo_id, rc); + GOTO(cleanup, rc); + } + +cleanup: + if (extents) { + OBD_FREE(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); + } + f_dput(dentry); + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + RETURN(rc); + +} +int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj, + int niocount, struct niobuf_local *local, int rc) +{ + struct lvfs_run_ctxt saved; + struct dentry *dentry; + struct niobuf_local *lnb; + __u64 offset = 0; + __u32 len = 0; + int i, flags; + + ENTRY; + + LASSERT(nobj == 1); + + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr, + obj->ioo_id); + if (IS_ERR(dentry)) { + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + RETURN (PTR_ERR(dentry)); + } + + if (dentry->d_inode == NULL) { + CERROR("trying to write extents to non-existent file "LPU64"\n", + obj->ioo_id); + GOTO(cleanup, rc = -ENOENT); + } + + flags = fsfilt_get_fs_flags(exp->exp_obd, dentry); + if (!(flags & SM_DO_REC)) { + GOTO(cleanup, rc); + } + + for (i = 0, lnb = local; i < obj->ioo_bufcnt; i++, lnb++) { + if (len == 0) { + offset = lnb->offset; + len = lnb->len; + } else if (lnb->offset == (offset + len)) { + len += lnb->len; + } else { + rc = fsfilt_write_extents(exp->exp_obd, dentry, + offset, len); + if (rc) { + CERROR("write exts off "LPU64" num %u rc:%d\n", + offset, len, rc); + GOTO(cleanup, rc); + } + offset = lnb->offset; + len = lnb->len; + } + } + if (len > 0) { + rc = fsfilt_write_extents(exp->exp_obd, dentry, + offset, len); + if (rc) { + CERROR("write exts off "LPU64" num %u rc:%d\n", + offset, len, rc); + GOTO(cleanup, rc); + } + } +cleanup: + f_dput(dentry); + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +} int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti,int rc) + struct niobuf_local *res, struct obd_trans_info *oti,int ret) { - if (cmd == OBD_BRW_WRITE) - return filter_commitrw_write(exp, oa, objcount, obj, niocount, - res, oti, rc); - if (cmd == OBD_BRW_READ) - return filter_commitrw_read(exp, oa, objcount, obj, niocount, - res, oti, rc); - LBUG(); - return -EPROTO; + int rc = -EPROTO; + struct lustre_id *id = obdo_id(oa); + __u32 len = sizeof(*id); + struct inode * inode = res->dentry->d_inode; + struct super_block * sb = res->dentry->d_sb; + struct obd_device *obd = class_exp2obd(exp); + + if (cmd == OBD_BRW_WRITE) { + rc = filter_commitrw_write(exp, oa, objcount, obj, niocount, + res, oti, ret); + fsfilt_set_info(obd, sb, inode, 10, "file_write", len, (void*)id); + } + else if (cmd == OBD_BRW_READ) { + rc = filter_commitrw_read(exp, oa, objcount, obj, niocount, + res, oti, ret); + fsfilt_set_info(obd, sb, inode, 9, "file_read", len, (void*)id); + } + else + LBUG(); + + return rc; } int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, @@ -810,21 +932,28 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, GOTO(out, ret = -ENOMEM); for (i = 0; i < oa_bufs; i++) { - rnb[i].offset = pga[i].off; + rnb[i].offset = pga[i].disk_offset; rnb[i].len = pga[i].count; } obdo_to_ioobj(oa, &ioo); ioo.ioo_bufcnt = oa_bufs; - ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti); + ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti,NULL); if (ret != 0) GOTO(out, ret); for (i = 0; i < oa_bufs; i++) { - void *virt = kmap(pga[i].pg); - obd_off off = pga[i].off & ~PAGE_MASK; - void *addr = kmap(lnb[i].page); + void *virt; + obd_off off; + void *addr; + + if (lnb[i].page == NULL) + break; + + off = pga[i].disk_offset & ~PAGE_MASK; + virt = kmap(pga[i].pg); + addr = kmap(lnb[i].page); /* 2 kmaps == vanishingly small deadlock opportunity */