LASSERT(desc->bd_brw_set != NULL);
LASSERT(desc->bd_brw_set->brw_callback != NULL);
+ /* It's important that you don't use desc->bd_brw_set after this
+ * callback runs. If you do, take a reference on it. */
desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
/* We can't kunmap the desc from interrupt context, so we do it from
LASSERT(desc->bd_brw_set != NULL);
- ptlrpc_abort_bulk(desc);
+ /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
+ * just make osc_ptl_ev_hdlr() check desc->bd_flags for either
+ * PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
+ * to brw_callback() and do the rest of the cleanup there. I
+ * also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
+ * but I could be wrong.
+ */
+ if (ptlrpc_abort_bulk(desc)) {
+ EXIT;
+ return;
+ }
obd_brw_set_del(desc);
unmap_and_decref_bulk_desc(desc);
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
struct obd_ioobj *iooptr;
- void *nioptr;
+ struct niobuf_remote *nioptr;
__u32 xid;
ENTRY;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, lsm, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* end almost identical to brw_write case */
xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL) {
unmap_and_decref_bulk_desc(desc);
GOTO(out_req, rc = -ENOMEM);
}
- bulk->bp_xid = xid; /* single xid for all pages */
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
+ bulk->bp_xid = xid; /* single xid for all pages */
bulk->bp_buf = kmap(pga[mapped].pg);
bulk->bp_page = pga[mapped].pg;
bulk->bp_buflen = PAGE_SIZE;
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, bulk->bp_xid);
}
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
struct obd_ioobj *iooptr;
- void *nioptr;
+ struct niobuf_remote *nioptr;
__u32 xid;
#if CHECKSUM_BULK
__u64 cksum = 0;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, lsm, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* end almost identical to brw_read case */
xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL) {
unmap_and_decref_bulk_desc(desc);
GOTO(out_req, rc = -ENOMEM);
}
- bulk->bp_xid = xid; /* single xid for all pages */
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
+ bulk->bp_xid = xid; /* single xid for all pages */
bulk->bp_buf = kmap(pga[mapped].pg);
bulk->bp_page = pga[mapped].pg;
+ /* matching ptlrpc_bulk_get assert */
+ LASSERT(pga[mapped].count > 0);
bulk->bp_buflen = pga[mapped].count;
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, bulk->bp_xid);
ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
}
#define OSC_BRW_MAX_SIZE 65536
#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+#warning "FIXME: make these values dynamic based on a get_info call at setup"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
struct brw_page *pga, struct obd_brw_set *set,
/* Note: caller will lock/unlock, and set uptodate on the pages */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
static int sanosc_brw_read(struct lustre_handle *conn,
- struct lov_stripe_md *md,
+ struct lov_stripe_md *lsm,
obd_count page_count,
struct brw_page *pga,
struct obd_brw_set *set)
{
struct ptlrpc_request *request = NULL;
struct ost_body *body;
- struct niobuf_remote *remote, *nio_rep;
- int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct niobuf_remote *nioptr;
struct obd_ioobj *iooptr;
- void *nioptr;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
- size[2] = page_count * sizeof(*remote);
+ size[2] = page_count * sizeof(*nioptr);
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
size, NULL);
body = lustre_msg_buf(request->rq_reqmsg, 0);
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
kmap(pga[mapped].pg);
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, 0);
}
- size[1] = page_count * sizeof(*remote);
+ size[1] = page_count * sizeof(*nioptr);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
GOTO(out_unmap, rc = -EINVAL);
}
- for (j = 0; j < page_count; j++) {
- ost_unpack_niobuf(&nioptr, &remote);
- }
-
- nioptr = lustre_msg_buf(request->rq_repmsg, 1);
- nio_rep = (struct niobuf_remote*)nioptr;
-
/* actual read */
- for (j = 0; j < page_count; j++) {
+ for (j = 0; j < page_count; j++, nioptr++) {
struct page *page = pga[j].pg;
struct buffer_head *bh;
kdev_t dev;
+ ost_unpack_niobuf(nioptr, nioptr);
/* got san device associated */
LASSERT(class_conn2obd(conn));
dev = class_conn2obd(conn)->u.cli.cl_sandev;
/* hole */
- if (!nio_rep[j].offset) {
+ if (!nioptr->offset) {
CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
page->mapping->host->i_ino,
page->index);
clear_bit(BH_New, &bh->b_state);
set_bit(BH_Mapped, &bh->b_state);
- bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+ bh->b_blocknr = (unsigned long)nioptr->offset;
clear_bit(BH_Uptodate, &bh->b_state);
* one we mapped before, check it */
LASSERT(!test_bit(BH_New, &bh->b_state));
LASSERT(test_bit(BH_Mapped, &bh->b_state));
- LASSERT(bh->b_blocknr ==
- (unsigned long)nio_rep[j].offset);
+ LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
/* wait it's io completion */
if (test_bit(BH_Lock, &bh->b_state))
}
static int sanosc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md,
+ struct lov_stripe_md *lsm,
obd_count page_count,
struct brw_page *pga,
struct obd_brw_set *set)
{
struct ptlrpc_request *request = NULL;
struct ost_body *body;
- struct niobuf_remote *remote, *nio_rep;
- int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct niobuf_remote *nioptr;
struct obd_ioobj *iooptr;
- void *nioptr;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
- size[2] = page_count * sizeof(*remote);
+ size[2] = page_count * sizeof(*nioptr);
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
3, size, NULL);
body = lustre_msg_buf(request->rq_reqmsg, 0);
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(iooptr, lsm, page_count);
/* map pages, and pack request */
obd_kmap_get(page_count, 0);
- for (mapped = 0; mapped < page_count; mapped++) {
+ for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
LASSERT(PageLocked(pga[mapped].pg));
+ LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
kmap(pga[mapped].pg);
- ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, 0);
}
- size[1] = page_count * sizeof(*remote);
+ size[1] = page_count * sizeof(*nioptr);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
GOTO(out_unmap, rc = -EINVAL);
}
- for (j = 0; j < page_count; j++) {
- ost_unpack_niobuf(&nioptr, &remote);
- }
-
- nioptr = lustre_msg_buf(request->rq_repmsg, 1);
- nio_rep = (struct niobuf_remote*)nioptr;
-
/* actual write */
- for (j = 0; j < page_count; j++) {
+ for (j = 0; j < page_count; j++, nioptr++) {
struct page *page = pga[j].pg;
struct buffer_head *bh;
kdev_t dev;
+ ost_unpack_niobuf(nioptr, nioptr);
/* got san device associated */
LASSERT(class_conn2obd(conn));
dev = class_conn2obd(conn)->u.cli.cl_sandev;
LASSERT(!test_bit(BH_New, &page->buffers->b_state));
LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
LASSERT(page->buffers->b_blocknr ==
- (unsigned long)nio_rep[j].offset);
+ (unsigned long)nioptr->offset);
}
bh = page->buffers;
set_bit(BH_Mapped, &bh->b_state);
/* override the block nr */
- bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+ bh->b_blocknr = (unsigned long)nioptr->offset;
/* we are about to write it, so set it
* uptodate/dirty
goto out_req;
}
-#else
-static int sanosc_brw_read(struct lustre_handle *conn,
- struct lov_stripe_md *md,
- obd_count page_count,
- struct brw_page *pga,
- struct obd_brw_set *set)
-{
- LBUG();
- return 0;
-}
-
-static int sanosc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md,
- obd_count page_count,
- struct brw_page *pga,
- struct obd_brw_set *set)
-{
- LBUG();
- return 0;
-}
-#endif
static int sanosc_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *md, obd_count page_count,
+ struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page *pga, struct obd_brw_set *set,
struct obd_trans_info *oti)
{
pages_per_brw = page_count;
if (cmd & OBD_BRW_WRITE)
- rc = sanosc_brw_write(conn, md, pages_per_brw,
+ rc = sanosc_brw_write(conn, lsm, pages_per_brw,
pga, set);
else
- rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
+ rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
if (rc != 0)
RETURN(rc);
RETURN(0);
}
#endif
+#endif
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
struct lustre_handle *parent_lock,
sizeof(extent), mode, lockh);
if (rc == 1)
/* We already have a lock, and it's referenced */
- RETURN(ELDLM_OK);
+ RETURN(ELDLM_LOCK_MATCHED);
/* If we're trying to read, we also search for an existing PW lock. The
* VFS and page cache already protect us locally, so lots of readers/
ldlm_lock_addref(lockh, LCK_PR);
ldlm_lock_decref(lockh, LCK_PW);
- RETURN(ELDLM_OK);
+ RETURN(ELDLM_LOCK_MATCHED);
}
}