Whamcloud - gitweb
Make a distinction between bulk callbacks and brw callbacks - the bulk
authoradilger <adilger>
Wed, 24 Jul 2002 16:43:48 +0000 (16:43 +0000)
committeradilger <adilger>
Wed, 24 Jul 2002 16:43:48 +0000 (16:43 +0000)
callbacks take a ptlrpc_bulk_desc as a parameter, and brw in general has
no idea about that (not that we use the brw callback yet).  Also add a
data parameter for the brw callback, otherwise it is mostly useless.

Some minor prep work for fixing the osc_brw_{read,write} request leak.

lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/llite/rw.c
lustre/mds/mds_extN.c
lustre/obdclass/class_obd.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c

index 4d76a62..ac4a75e 100644 (file)
@@ -209,14 +209,14 @@ struct obd_device {
         } u;
 };
 
+typedef void (*brw_callback_t)(void *);
+
 struct obd_ops {
-        int (*o_iocontrol)(long cmd, struct lustre_handle *, int len, void *karg,
-                           void *uarg);
-        int (*o_get_info)(struct lustre_handle *, 
-                          obd_count keylen, void *key,
+        int (*o_iocontrol)(long cmd, struct lustre_handle *, int len,
+                           void *karg, void *uarg);
+        int (*o_get_info)(struct lustre_handle *, obd_count keylen, void *key,
                           obd_count *vallen, void **val);
-        int (*o_set_info)(struct lustre_handle *, 
-                          obd_count keylen, void *key,
+        int (*o_set_info)(struct lustre_handle *, obd_count keylen, void *key,
                           obd_count vallen, void *val);
         int (*o_attach)(struct obd_device *dev, obd_count len, void *data);
         int (*o_detach)(struct obd_device *dev);
@@ -227,10 +227,11 @@ struct obd_ops {
 
 
         int (*o_statfs)(struct lustre_handle *conn, struct statfs *statfs);
-        int (*o_preallocate)(struct lustre_handle *, obd_count *req, obd_id *ids);
-        int (*o_create)(struct lustre_handle *conn,  struct obdo *oa, 
+        int (*o_preallocate)(struct lustre_handle *, obd_count *req,
+                             obd_id *ids);
+        int (*o_create)(struct lustre_handle *conn,  struct obdo *oa,
                         struct lov_stripe_md **ea);
-        int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa, 
+        int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa,
                          struct lov_stripe_md *ea);
         int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa);
         int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa);
@@ -238,23 +239,22 @@ struct obd_ops {
                       struct lov_stripe_md *);
         int (*o_close)(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *);
-        int (*o_brw)(int rw, struct lustre_handle *conn, 
-                     struct lov_stripe_md *md, obd_count oa_bufs, 
-                     struct page **buf,
-                     obd_size *count, 
-                     obd_off *offset, 
-                     obd_flag *flags,
-                     void *);
-        int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt, struct lov_stripe_md *md, obd_size count,
+        int (*o_brw)(int rw, struct lustre_handle *conn,
+                     struct lov_stripe_md *md, obd_count oa_bufs,
+                     struct page **buf, obd_size *count, obd_off *offset,
+                     obd_flag *flags, brw_callback_t callback, void * data);
+        int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt,
+                       struct lov_stripe_md *md, obd_size count,
                        obd_off offset);
-        int (*o_sync)(struct lustre_handle *conn, struct obdo *tgt, obd_size count,
-                      obd_off offset);
+        int (*o_sync)(struct lustre_handle *conn, struct obdo *tgt,
+                      obd_size count, obd_off offset);
         int (*o_migrate)(struct lustre_handle *conn, struct obdo *dst,
                          struct obdo *src, obd_size count, obd_off offset);
         int (*o_copy)(struct lustre_handle *dstconn, struct obdo *dst,
                       struct lustre_handle *srconn, struct obdo *src,
                       obd_size count, obd_off offset);
-        int (*o_iterate)(struct lustre_handle *conn, int (*)(obd_id, obd_gr, void *),
+        int (*o_iterate)(struct lustre_handle *conn,
+                         int (*)(obd_id, obd_gr, void *),
                          obd_id *startid, obd_gr group, void *data);
         int (*o_preprw)(int cmd, struct lustre_handle *conn,
                         int objcount, struct obd_ioobj *obj,
@@ -269,7 +269,8 @@ struct obd_ops {
                          __u32 type, void *cookie, int cookielen, __u32 mode,
                          int *flags, void *cb, void *data, int datalen,
                          struct lustre_handle *lockh);
-        int (*o_cancel)(struct lustre_handle *, __u32 mode, struct lustre_handle *);
+        int (*o_cancel)(struct lustre_handle *, __u32 mode,
+                        struct lustre_handle *);
 };
 
 #endif
index 5469666..906e42c 100644 (file)
@@ -334,7 +334,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn,
                           obd_size *count, 
                           obd_off *offset,
                           obd_flag *flags, 
-                          void *callback)
+                          brw_callback_t callback, void *data)
 {
         int rc;
         struct obd_export *export;
@@ -347,7 +347,7 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn,
         }
 
         rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, buf,
-                                    count, offset, flags, callback);
+                                       count, offset, flags, callback, data);
         RETURN(rc);
 }
 
index 46689a1..32b45de 100644 (file)
@@ -45,7 +45,7 @@ static int ll_brw(int rw, struct inode *inode, struct page *page, int create)
         ENTRY;
 
         err = obd_brw(rw, ll_i2obdconn(inode), md, 1,
-                      &page, &count, &offset, &flags, NULL);
+                      &page, &count, &offset, &flags, NULL, NULL);
         RETURN(err);
 } /* ll_brw */
 
@@ -161,11 +161,11 @@ static int ll_commit_write(struct file *file, struct page *page,
         if (!PageLocked(page))
                 LBUG();
 
-        CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n", 
+        CDEBUG(D_INODE, "commit_page writing (at %d) to %d, count %Ld\n",
                from, to, (unsigned long long)count);
 
-        err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 
-                      1, &page, &count, &offset, &flags, NULL);
+        err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md,
+                      1, &page, &count, &offset, &flags, NULL, NULL);
         kunmap(page);
 
         iattr.ia_size = offset + to;
@@ -257,7 +257,7 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
 
         rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
                      ll_i2obdconn(inode), md, bufs_per_obdo,
-                     iobuf->maplist, count, offset, flags, NULL);
+                     iobuf->maplist, count, offset, flags, NULL, NULL);
         if (rc == 0)
                 rc = bufs_per_obdo * PAGE_SIZE;
 
@@ -271,10 +271,7 @@ out:
 
 int ll_flush_inode_pages(struct inode * inode)
 {
-        //int i;
-        //        obd_count        num_obdo = 1;
         obd_count        bufs_per_obdo = 0;
-        struct obdo     *oa = NULL;
         obd_size         *count = NULL;
         obd_off          *offset = NULL;
         obd_flag         *flags = NULL;
@@ -294,23 +291,19 @@ int ll_flush_inode_pages(struct inode * inode)
                 GOTO(out, err=-ENOMEM);
 
 #if 0
-        for (i = 0 ; i < bufs_per_obdo ; i++) { 
+        for (i = 0 ; i < bufs_per_obdo ; i++) {
                 count[i] = PAGE_SIZE;
                 offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
                 flags[i] = OBD_BRW_CREATE;
         }
 
-        oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
-        if (!oa)
-                RETURN(-ENOMEM);
-
-        err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
-                      iobuf->maplist, count, offset, flags);
-        if (err == 0) 
+        err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
+                      ll_i2info(inode)->lli_smd, bufs_per_obdo,
+                      iobuf->maplist, count, offset, flags, NULL, NULL);
+        if (err == 0)
                 err = bufs_per_obdo * 4096;
 #endif
  out:
-        obdo_free(oa);
         OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
         OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
         OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
@@ -326,7 +319,7 @@ struct address_space_operations ll_aops = {
         direct_IO: ll_direct_IO,
 #endif
         sync_page: block_sync_page,
-        prepare_write: ll_prepare_write, 
+        prepare_write: ll_prepare_write,
         commit_write: ll_commit_write,
         bmap: NULL
 };
index c3c50c5..fcf9faf 100644 (file)
@@ -293,8 +293,8 @@ static struct mds_fs_operations mds_extN_fs_ops = {
         fs_start:               mds_extN_start,
         fs_commit:              mds_extN_commit,
         fs_setattr:             mds_extN_setattr,
-        fs_set_md:            mds_extN_set_md,
-        fs_get_md:            mds_extN_get_md,
+        fs_set_md:              mds_extN_set_md,
+        fs_get_md:              mds_extN_get_md,
         fs_readpage:            mds_extN_readpage,
         fs_delete_inode:        mds_extN_delete_inode,
         cl_delete_inode:        clear_inode,
index ed521eb..73251d2 100644 (file)
@@ -525,7 +525,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                 }
 
                 err = obd_brw(rw, &conn, &smd, j, bufs, counts, offsets, flags,
-                              NULL);
+                              NULL, NULL);
 
                 EXIT;
         brw_cleanup:
index 97f8038..f708884 100644 (file)
@@ -712,7 +712,8 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
 static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, 
                                struct lov_stripe_md *md, obd_count oa_bufs,
                                struct page **pages, obd_size *count,
-                               obd_off *offset, obd_flag *flags, void *callback)
+                               obd_off *offset, obd_flag *flags,
+                               brw_callback_t callback, void *data)
 {
         struct obd_run_ctxt      saved;
         struct super_block      *sb;
@@ -737,7 +738,7 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
         file = filter_obj_open(obd, md->lmd_object_id, S_IFREG);
         if (IS_ERR(file))
                 GOTO(out, retval = PTR_ERR(file));
-        
+
         /* count doubles as retval */
         for (pg = 0; pg < oa_bufs; pg++) {
                 CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) "
@@ -751,21 +752,23 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
                         char *buffer;
                         off = offset[pnum];
                         buffer = kmap(pages[pnum]);
-                        retval = file->f_op->write(file, buffer, count[pnum], &off);
+                        retval = file->f_op->write(file, buffer, count[pnum],
+                                                   &off);
                         kunmap(pages[pnum]);
                         CDEBUG(D_INODE, "retval %ld\n", retval);
                 } else {
                         loff_t off = offset[pnum];
                         char *buffer = kmap(pages[pnum]);
-                        
+
                         if (off >= file->f_dentry->d_inode->i_size) {
                                 memset(buffer, 0, count[pnum]);
                                 retval = count[pnum];
                         } else {
-                                retval = file->f_op->read(file, buffer, count[pnum], &off);
+                                retval = file->f_op->read(file, buffer,
+                                                          count[pnum], &off);
                         }
                         kunmap(pages[pnum]);
-                        
+
                         if (retval != count[pnum]) {
                                 filp_close(file, 0);
                                 GOTO(out, retval = -EIO);
@@ -778,6 +781,7 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn,
         /* ctimes/mtimes will follow with a setattr call */
         filp_close(file, 0);
 
+        /* XXX: do something with callback if it is set? */
 
         EXIT;
 out:
@@ -1359,8 +1363,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
                 obd_flag         flagw = OBD_BRW_CREATE;
 
                 page->index = index;
-                err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1,
-                             &page, &brw_count, &brw_offset, &flagr, NULL);
+                err = obd_brw(OBD_BRW_READ, src_conn, &srcmd, 1, &page,
+                              &brw_count, &brw_offset, &flagr, NULL, NULL);
 
                 if ( err ) {
                         EXIT;
@@ -1368,8 +1372,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst,
                 }
                 CDEBUG(D_INFO, "Read page %ld ...\n", page->index);
 
-                err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1,
-                             &page, &brw_count, &brw_offset, &flagw, NULL);
+                err = obd_brw(OBD_BRW_WRITE, dst_conn, &dstmd, 1, &page,
+                              &brw_count, &brw_offset, &flagw, NULL, NULL);
 
                 /* XXX should handle dst->o_size, dst->o_blocks here */
                 if ( err ) {
index d3fdd6f..8594c94 100644 (file)
@@ -422,7 +422,7 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
 }
 
 struct osc_brw_cb_data {
-        bulk_callback_t callback;
+        brw_callback_t callback;
         void *cb_data;
         void *obd_data;
         size_t obd_size;
@@ -432,10 +432,11 @@ struct osc_brw_cb_data {
 static void unmap_and_decref_bulk_desc(void *data)
 {
         struct ptlrpc_bulk_desc *desc = data;
-        struct list_head *tmp, *next;
+        struct list_head *tmp;
         ENTRY;
+
         /* This feels wrong to me. */
-        list_for_each_safe(tmp, next, &desc->b_page_list) {
+        list_for_each(tmp, &desc->b_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
 
@@ -455,10 +456,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
                 CERROR("got signal\n");
 
         if (cb_data->callback)
-                (cb_data->callback)(desc, cb_data->cb_data);
+                cb_data->callback(cb_data->cb_data);
 
-        if (cb_data->obd_data)
-                OBD_FREE(cb_data->obd_data, cb_data->obd_size);
+        OBD_FREE(cb_data->obd_data, cb_data->obd_size);
         OBD_FREE(cb_data, sizeof(*cb_data));
 
         /* We can't kunmap the desc from interrupt context, so we do it from
@@ -473,7 +473,7 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
                         obd_count page_count, struct page **page_array,
                         obd_size *count, obd_off *offset, obd_flag *flags,
-                        bulk_callback_t callback)
+                        brw_callback_t callback, void *data)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -481,9 +481,10 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
-        int rc, j, size[3] = {sizeof(*body)};
-        void *iooptr, *nioptr;
         struct osc_brw_cb_data *cb_data = NULL;
+        int rc, size[3] = {sizeof(*body)};
+        void *iooptr, *nioptr;
+        int mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
@@ -507,13 +508,14 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
         if (!cb_data)
                 GOTO(out_free, rc = -ENOMEM);
         cb_data->callback = callback;
+        cb_data->cb_data = data;
         desc->b_cb_data = cb_data;
-        /* XXX end almost identical to brw_write case */
+        /* end almost identical to brw_write case */
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
         ost_pack_ioo(&iooptr, md, page_count);
-        for (j = 0; j < page_count; j++) {
+        for (mapped = 0; mapped < page_count; mapped++) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
@@ -523,11 +525,11 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
                 bulk->b_xid = ++connection->c_xid_out;
                 spin_unlock(&connection->c_lock);
 
-                bulk->b_buf = kmap(page_array[j]);
-                bulk->b_page = page_array[j];
+                bulk->b_buf = kmap(page_array[mapped]);
+                bulk->b_page = page_array[mapped];
                 bulk->b_buflen = PAGE_SIZE;
-                ost_pack_niobuf(&nioptr, offset[j], count[j],
-                                flags[j], bulk->b_xid);
+                ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
+                                flags[mapped], bulk->b_xid);
         }
 
         /*
@@ -561,8 +563,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
 
         /* Clean up on error. */
  out_unmap:
-        for (j = 0; j < desc->b_page_count; j++)
-                kunmap(page_array[j]);
+        while (mapped-- > 0)
+                kunmap(page_array[mapped]);
  out_free:
         if (cb_data)
                 OBD_FREE(cb_data, sizeof(*cb_data));
@@ -575,7 +577,7 @@ static int osc_brw_write(struct lustre_handle *conn,
                          struct lov_stripe_md *md, obd_count page_count,
                          struct page **pagearray, obd_size *count,
                          obd_off *offset, obd_flag *flags,
-                         bulk_callback_t callback)
+                         brw_callback_t callback, void *data)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -588,6 +590,7 @@ static int osc_brw_write(struct lustre_handle *conn,
         struct osc_brw_cb_data *cb_data = NULL;
         int rc, j, size[3] = {sizeof(*body)};
         void *iooptr, *nioptr;
+        int mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
@@ -615,22 +618,26 @@ static int osc_brw_write(struct lustre_handle *conn,
         if (!cb_data)
                 GOTO(out_free, rc = -ENOMEM);
         cb_data->callback = callback;
+        cb_data->cb_data = data;
         desc->b_cb_data = cb_data;
-        /* XXX end almost identical to brw_read case */
-        cb_data->obd_data = local;
-        cb_data->obd_size = page_count * sizeof(*local);
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
         ost_pack_ioo(&iooptr, md, page_count);
-        for (j = 0; j < page_count; j++) {
-                local[j].addr = kmap(pagearray[j]);
-                local[j].offset = offset[j];
-                local[j].len = count[j];
-                ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
+        /* end almost identical to brw_read case */
+
+        cb_data->obd_data = local;
+        cb_data->obd_size = page_count * sizeof(*local);
+
+        for (mapped = 0; mapped < page_count; mapped++) {
+                local[mapped].addr = kmap(pagearray[mapped]);
+                local[mapped].offset = offset[mapped];
+                local[mapped].len = count[mapped];
+                ost_pack_niobuf(&nioptr, offset[mapped], count[mapped],
+                                flags[mapped], 0);
         }
 
-        size[1] = page_count * sizeof(struct niobuf_remote);
+        size[1] = page_count * sizeof(*remote);
         request->rq_replen = lustre_msg_size(2, size);
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
@@ -641,26 +648,24 @@ static int osc_brw_write(struct lustre_handle *conn,
         if (!nioptr)
                 GOTO(out_unmap, rc = -EINVAL);
 
-        if (request->rq_repmsg->buflens[1] !=
-            page_count * sizeof(struct niobuf_remote)) {
+        if (request->rq_repmsg->buflens[1] != size[1]) {
                 CERROR("buffer length wrong (%d vs. %d)\n",
-                       request->rq_repmsg->buflens[1],
-                       page_count * sizeof(struct niobuf_remote));
+                       request->rq_repmsg->buflens[1], size[1]);
                 GOTO(out_unmap, rc = -EINVAL);
         }
 
         for (j = 0; j < page_count; j++) {
-                struct ptlrpc_bulk_page *page;
+                struct ptlrpc_bulk_page *bulk;
 
                 ost_unpack_niobuf(&nioptr, &remote);
 
-                page = ptlrpc_prep_bulk_page(desc);
-                if (!page)
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (!bulk)
                         GOTO(out_unmap, rc = -ENOMEM);
 
-                page->b_buf = (void *)(unsigned long)local[j].addr;
-                page->b_buflen = local[j].len;
-                page->b_xid = remote->xid;
+                bulk->b_buf = (void *)(unsigned long)local[j].addr;
+                bulk->b_buflen = local[j].len;
+                bulk->b_xid = remote->xid;
         }
 
         if (desc->b_page_count != page_count)
@@ -691,14 +696,12 @@ static int osc_brw_write(struct lustre_handle *conn,
 
         /* Clean up on error. */
  out_unmap:
-        for (j = 0; j < page_count; j++)
-                kunmap(pagearray[j]);
+        while (mapped-- > 0)
+                kunmap(pagearray[mapped]);
 
  out_free:
-        if (cb_data)
-                OBD_FREE(cb_data, sizeof(*cb_data));
-        if (local)
-                OBD_FREE(local, page_count * sizeof(*local));
+        OBD_FREE(cb_data, sizeof(*cb_data));
+        OBD_FREE(local, page_count * sizeof(*local));
         ptlrpc_free_bulk(desc);
         ptlrpc_req_finished(request);
         return rc;
@@ -706,18 +709,15 @@ static int osc_brw_write(struct lustre_handle *conn,
 
 static int osc_brw(int cmd, struct lustre_handle *conn,
                    struct lov_stripe_md *md, obd_count page_count,
-                   struct page **page_array,
-                   obd_size *count,
-                   obd_off *offset,
-                   obd_flag *flags,
-                   void *callback)
+                   struct page **page_array, obd_size *count, obd_off *offset,
+                   obd_flag *flags, brw_callback_t callback, void *data)
 {
         if (cmd & OBD_BRW_WRITE)
                 return osc_brw_write(conn, md, page_count, page_array, count,
-                                     offset, flags, (bulk_callback_t)callback);
+                                     offset, flags, callback, data);
         else
                 return osc_brw_read(conn, md, page_count, page_array, count,
-                                    offset, flags, (bulk_callback_t)callback);
+                                    offset, flags, callback, data);
 }
 
 static int osc_enqueue(struct lustre_handle *conn,