From: adilger Date: Wed, 24 Jul 2002 16:43:48 +0000 (+0000) Subject: Make a distinction between bulk callbacks and brw callbacks - the bulk X-Git-Tag: v1_7_100~5243 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=490b7cef6afa51dc26f8ce998fb591cd2fff122f;p=fs%2Flustre-release.git Make a distinction between bulk callbacks and brw callbacks - the bulk callbacks take a ptlrpc_bulk_desc as a parameter, and brw in general has no idea about that (not that we use the brw callback yet). Also add a data parameter for the brw callback, otherwise it is mostly useless. Some minor prep work for fixing the osc_brw_{read,write} request leak. --- diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 4d76a62..ac4a75e 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -209,14 +209,14 @@ struct obd_device { } u; }; +typedef void (*brw_callback_t)(void *); + struct obd_ops { - int (*o_iocontrol)(long cmd, struct lustre_handle *, int len, void *karg, - void *uarg); - int (*o_get_info)(struct lustre_handle *, - obd_count keylen, void *key, + int (*o_iocontrol)(long cmd, struct lustre_handle *, int len, + void *karg, void *uarg); + int (*o_get_info)(struct lustre_handle *, obd_count keylen, void *key, obd_count *vallen, void **val); - int (*o_set_info)(struct lustre_handle *, - obd_count keylen, void *key, + int (*o_set_info)(struct lustre_handle *, obd_count keylen, void *key, obd_count vallen, void *val); int (*o_attach)(struct obd_device *dev, obd_count len, void *data); int (*o_detach)(struct obd_device *dev); @@ -227,10 +227,11 @@ struct obd_ops { int (*o_statfs)(struct lustre_handle *conn, struct statfs *statfs); - int (*o_preallocate)(struct lustre_handle *, obd_count *req, obd_id *ids); - int (*o_create)(struct lustre_handle *conn, struct obdo *oa, + int (*o_preallocate)(struct lustre_handle *, obd_count *req, + obd_id *ids); + int (*o_create)(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea); - int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa, + int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea); int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa); int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa); @@ -238,23 +239,22 @@ struct obd_ops { struct lov_stripe_md *); int (*o_close)(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *); - int (*o_brw)(int rw, struct lustre_handle *conn, - struct lov_stripe_md *md, obd_count oa_bufs, - struct page **buf, - obd_size *count, - obd_off *offset, - obd_flag *flags, - void *); - int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt, struct lov_stripe_md *md, obd_size count, + int (*o_brw)(int rw, struct lustre_handle *conn, + struct lov_stripe_md *md, obd_count oa_bufs, + struct page **buf, obd_size *count, obd_off *offset, + obd_flag *flags, brw_callback_t callback, void * data); + int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt, + struct lov_stripe_md *md, obd_size count, obd_off offset); - int (*o_sync)(struct lustre_handle *conn, struct obdo *tgt, obd_size count, - obd_off offset); + int (*o_sync)(struct lustre_handle *conn, struct obdo *tgt, + obd_size count, obd_off offset); int (*o_migrate)(struct lustre_handle *conn, struct obdo *dst, struct obdo *src, obd_size count, obd_off offset); int (*o_copy)(struct lustre_handle *dstconn, struct obdo *dst, struct lustre_handle *srconn, struct obdo *src, obd_size count, obd_off offset); - int (*o_iterate)(struct lustre_handle *conn, int (*)(obd_id, obd_gr, void *), + int (*o_iterate)(struct lustre_handle *conn, + int (*)(obd_id, obd_gr, void *), obd_id *startid, obd_gr group, void *data); int (*o_preprw)(int cmd, struct lustre_handle *conn, int objcount, struct obd_ioobj *obj, @@ -269,7 +269,8 @@ struct obd_ops { __u32 type, void *cookie, int cookielen, __u32 mode, int *flags, void *cb, void *data, int datalen, struct lustre_handle *lockh); - int (*o_cancel)(struct lustre_handle *, __u32 mode, struct lustre_handle *); + int (*o_cancel)(struct lustre_handle *, __u32 mode, + struct lustre_handle *); }; #endif diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 5469666..906e42c 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -334,7 +334,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn, obd_size *count, obd_off *offset, obd_flag *flags, - void *callback) + brw_callback_t callback, void *data) { int rc; struct obd_export *export; @@ -347,7 +347,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn, } rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, buf, - count, offset, flags, callback); + count, offset, flags, callback, data); RETURN(rc); } diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 46689a1..32b45de 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -45,7 +45,7 @@ static int ll_brw(int rw, struct inode *inode, struct page *page, int create) ENTRY; err = obd_brw(rw, ll_i2obdconn(inode), md, 1, - &page, &count, &offset, &flags, NULL); + &page, &count, &offset, &flags, NULL, NULL); RETURN(err); } /* ll_brw */ @@ -161,11 +161,11 @@ static int ll_commit_write(struct file *file, struct page *page, if (!PageLocked(page)) LBUG(); - CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n", + CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n", from, to, (unsigned long long)count); - err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, - 1, &page, &count, &offset, &flags, NULL); + err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, + 1, &page, &count, &offset, &flags, NULL, NULL); kunmap(page); iattr.ia_size = offset + to; @@ -257,7 +257,7 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, ll_i2obdconn(inode), md, bufs_per_obdo, - iobuf->maplist, count, offset, flags, NULL); + iobuf->maplist, count, offset, flags, NULL, NULL); if (rc == 0) rc = bufs_per_obdo * PAGE_SIZE; @@ -271,10 +271,7 @@ out: int ll_flush_inode_pages(struct inode * inode) { - //int i; - // obd_count num_obdo = 1; obd_count bufs_per_obdo = 0; - struct obdo *oa = NULL; obd_size *count = NULL; obd_off *offset = NULL; obd_flag *flags = NULL; @@ -294,23 +291,19 @@ int ll_flush_inode_pages(struct inode * inode) GOTO(out, err=-ENOMEM); #if 0 - for (i = 0 ; i < bufs_per_obdo ; i++) { + for (i = 0 ; i < bufs_per_obdo ; i++) { count[i] = PAGE_SIZE; offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT; flags[i] = OBD_BRW_CREATE; } - oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); - if (!oa) - RETURN(-ENOMEM); - - err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo, - iobuf->maplist, count, offset, flags); - if (err == 0) + err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, bufs_per_obdo, + iobuf->maplist, count, offset, flags, NULL, NULL); + if (err == 0) err = bufs_per_obdo * 4096; #endif out: - obdo_free(oa); OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo); OBD_FREE(count, sizeof(*count) * bufs_per_obdo); OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo); @@ -326,7 +319,7 @@ struct address_space_operations ll_aops = { direct_IO: ll_direct_IO, #endif sync_page: block_sync_page, - prepare_write: ll_prepare_write, + prepare_write: ll_prepare_write, commit_write: ll_commit_write, bmap: NULL }; diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index c3c50c5..fcf9faf 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -293,8 +293,8 @@ static struct mds_fs_operations mds_extN_fs_ops = { fs_start: mds_extN_start, fs_commit: mds_extN_commit, fs_setattr: mds_extN_setattr, - fs_set_md: mds_extN_set_md, - fs_get_md: mds_extN_get_md, + fs_set_md: mds_extN_set_md, + fs_get_md: mds_extN_get_md, fs_readpage: mds_extN_readpage, fs_delete_inode: mds_extN_delete_inode, cl_delete_inode: clear_inode, diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index ed521eb..73251d2 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -525,7 +525,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, } err = obd_brw(rw, &conn, &smd, j, bufs, counts, offsets, flags, - NULL); + NULL, NULL); EXIT; brw_cleanup: diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 97f8038..f708884 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -712,7 +712,8 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, struct page **pages, obd_size *count, - obd_off *offset, obd_flag *flags, void *callback) + obd_off *offset, obd_flag *flags, + brw_callback_t callback, void *data) { struct obd_run_ctxt saved; struct super_block *sb; @@ -737,7 +738,7 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, file = filter_obj_open(obd, md->lmd_object_id, S_IFREG); if (IS_ERR(file)) GOTO(out, retval = PTR_ERR(file)); - + /* count doubles as retval */ for (pg = 0; pg < oa_bufs; pg++) { CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) " @@ -751,21 +752,23 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, char *buffer; off = offset[pnum]; buffer = kmap(pages[pnum]); - retval = file->f_op->write(file, buffer, count[pnum], &off); + retval = file->f_op->write(file, buffer, count[pnum], + &off); kunmap(pages[pnum]); CDEBUG(D_INODE, "retval %ld\n", retval); } else { loff_t off = offset[pnum]; char *buffer = kmap(pages[pnum]); - + if (off >= file->f_dentry->d_inode->i_size) { memset(buffer, 0, count[pnum]); retval = count[pnum]; } else { - retval = file->f_op->read(file, buffer, count[pnum], &off); + retval = file->f_op->read(file, buffer, + count[pnum], &off); } kunmap(pages[pnum]); - + if (retval != count[pnum]) { filp_close(file, 0); GOTO(out, retval = -EIO); @@ -778,6 +781,7 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, /* ctimes/mtimes will follow with a setattr call */ filp_close(file, 0); + /* XXX: do something with callback if it is set? */ EXIT; out: @@ -1359,8 +1363,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, obd_flag flagw = OBD_BRW_CREATE; page->index = index; - err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, - &page, &brw_count, &brw_offset, &flagr, NULL); + err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &page, + &brw_count, &brw_offset, &flagr, NULL, NULL); if ( err ) { EXIT; @@ -1368,8 +1372,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, } CDEBUG(D_INFO, "Read page %ld ...\n", page->index); - err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, - &page, &brw_count, &brw_offset, &flagw, NULL); + err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &page, + &brw_count, &brw_offset, &flagw, NULL, NULL); /* XXX should handle dst->o_size, dst->o_blocks here */ if ( err ) { diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index d3fdd6f..8594c94 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -422,7 +422,7 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa, } struct osc_brw_cb_data { - bulk_callback_t callback; + brw_callback_t callback; void *cb_data; void *obd_data; size_t obd_size; @@ -432,10 +432,11 @@ struct osc_brw_cb_data { static void unmap_and_decref_bulk_desc(void *data) { struct ptlrpc_bulk_desc *desc = data; - struct list_head *tmp, *next; + struct list_head *tmp; ENTRY; + /* This feels wrong to me. */ - list_for_each_safe(tmp, next, &desc->b_page_list) { + list_for_each(tmp, &desc->b_page_list) { struct ptlrpc_bulk_page *bulk; bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); @@ -455,10 +456,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) CERROR("got signal\n"); if (cb_data->callback) - (cb_data->callback)(desc, cb_data->cb_data); + cb_data->callback(cb_data->cb_data); - if (cb_data->obd_data) - OBD_FREE(cb_data->obd_data, cb_data->obd_size); + OBD_FREE(cb_data->obd_data, cb_data->obd_size); OBD_FREE(cb_data, sizeof(*cb_data)); /* We can't kunmap the desc from interrupt context, so we do it from @@ -473,7 +473,7 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct page **page_array, obd_size *count, obd_off *offset, obd_flag *flags, - bulk_callback_t callback) + brw_callback_t callback, void *data) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; @@ -481,9 +481,10 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, struct ptlrpc_request *request = NULL; struct ptlrpc_bulk_desc *desc = NULL; struct ost_body *body; - int rc, j, size[3] = {sizeof(*body)}; - void *iooptr, *nioptr; struct osc_brw_cb_data *cb_data = NULL; + int rc, size[3] = {sizeof(*body)}; + void *iooptr, *nioptr; + int mapped = 0; ENTRY; size[1] = sizeof(struct obd_ioobj); @@ -507,13 +508,14 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, if (!cb_data) GOTO(out_free, rc = -ENOMEM); cb_data->callback = callback; + cb_data->cb_data = data; desc->b_cb_data = cb_data; - /* XXX end almost identical to brw_write case */ + /* end almost identical to brw_write case */ iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); ost_pack_ioo(&iooptr, md, page_count); - for (j = 0; j < page_count; j++) { + for (mapped = 0; mapped < page_count; mapped++) { struct ptlrpc_bulk_page *bulk; bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) @@ -523,11 +525,11 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, bulk->b_xid = ++connection->c_xid_out; spin_unlock(&connection->c_lock); - bulk->b_buf = kmap(page_array[j]); - bulk->b_page = page_array[j]; + bulk->b_buf = kmap(page_array[mapped]); + bulk->b_page = page_array[mapped]; bulk->b_buflen = PAGE_SIZE; - ost_pack_niobuf(&nioptr, offset[j], count[j], - flags[j], bulk->b_xid); + ost_pack_niobuf(&nioptr, offset[mapped], count[mapped], + flags[mapped], bulk->b_xid); } /* @@ -561,8 +563,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, /* Clean up on error. */ out_unmap: - for (j = 0; j < desc->b_page_count; j++) - kunmap(page_array[j]); + while (mapped-- > 0) + kunmap(page_array[mapped]); out_free: if (cb_data) OBD_FREE(cb_data, sizeof(*cb_data)); @@ -575,7 +577,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct page **pagearray, obd_size *count, obd_off *offset, obd_flag *flags, - bulk_callback_t callback) + brw_callback_t callback, void *data) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; @@ -588,6 +590,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct osc_brw_cb_data *cb_data = NULL; int rc, j, size[3] = {sizeof(*body)}; void *iooptr, *nioptr; + int mapped = 0; ENTRY; size[1] = sizeof(struct obd_ioobj); @@ -615,22 +618,26 @@ static int osc_brw_write(struct lustre_handle *conn, if (!cb_data) GOTO(out_free, rc = -ENOMEM); cb_data->callback = callback; + cb_data->cb_data = data; desc->b_cb_data = cb_data; - /* XXX end almost identical to brw_read case */ - cb_data->obd_data = local; - cb_data->obd_size = page_count * sizeof(*local); iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); ost_pack_ioo(&iooptr, md, page_count); - for (j = 0; j < page_count; j++) { - local[j].addr = kmap(pagearray[j]); - local[j].offset = offset[j]; - local[j].len = count[j]; - ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0); + /* end almost identical to brw_read case */ + + cb_data->obd_data = local; + cb_data->obd_size = page_count * sizeof(*local); + + for (mapped = 0; mapped < page_count; mapped++) { + local[mapped].addr = kmap(pagearray[mapped]); + local[mapped].offset = offset[mapped]; + local[mapped].len = count[mapped]; + ost_pack_niobuf(&nioptr, offset[mapped], count[mapped], + flags[mapped], 0); } - size[1] = page_count * sizeof(struct niobuf_remote); + size[1] = page_count * sizeof(*remote); request->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); @@ -641,26 +648,24 @@ static int osc_brw_write(struct lustre_handle *conn, if (!nioptr) GOTO(out_unmap, rc = -EINVAL); - if (request->rq_repmsg->buflens[1] != - page_count * sizeof(struct niobuf_remote)) { + if (request->rq_repmsg->buflens[1] != size[1]) { CERROR("buffer length wrong (%d vs. %d)\n", - request->rq_repmsg->buflens[1], - page_count * sizeof(struct niobuf_remote)); + request->rq_repmsg->buflens[1], size[1]); GOTO(out_unmap, rc = -EINVAL); } for (j = 0; j < page_count; j++) { - struct ptlrpc_bulk_page *page; + struct ptlrpc_bulk_page *bulk; ost_unpack_niobuf(&nioptr, &remote); - page = ptlrpc_prep_bulk_page(desc); - if (!page) + bulk = ptlrpc_prep_bulk_page(desc); + if (!bulk) GOTO(out_unmap, rc = -ENOMEM); - page->b_buf = (void *)(unsigned long)local[j].addr; - page->b_buflen = local[j].len; - page->b_xid = remote->xid; + bulk->b_buf = (void *)(unsigned long)local[j].addr; + bulk->b_buflen = local[j].len; + bulk->b_xid = remote->xid; } if (desc->b_page_count != page_count) @@ -691,14 +696,12 @@ static int osc_brw_write(struct lustre_handle *conn, /* Clean up on error. */ out_unmap: - for (j = 0; j < page_count; j++) - kunmap(pagearray[j]); + while (mapped-- > 0) + kunmap(pagearray[mapped]); out_free: - if (cb_data) - OBD_FREE(cb_data, sizeof(*cb_data)); - if (local) - OBD_FREE(local, page_count * sizeof(*local)); + OBD_FREE(cb_data, sizeof(*cb_data)); + OBD_FREE(local, page_count * sizeof(*local)); ptlrpc_free_bulk(desc); ptlrpc_req_finished(request); return rc; @@ -706,18 +709,15 @@ static int osc_brw_write(struct lustre_handle *conn, static int osc_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, - struct page **page_array, - obd_size *count, - obd_off *offset, - obd_flag *flags, - void *callback) + struct page **page_array, obd_size *count, obd_off *offset, + obd_flag *flags, brw_callback_t callback, void *data) { if (cmd & OBD_BRW_WRITE) return osc_brw_write(conn, md, page_count, page_array, count, - offset, flags, (bulk_callback_t)callback); + offset, flags, callback, data); else return osc_brw_read(conn, md, page_count, page_array, count, - offset, flags, (bulk_callback_t)callback); + offset, flags, callback, data); } static int osc_enqueue(struct lustre_handle *conn,