static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
{
- struct ptlrpc_bulk_desc **bulk_vec = NULL, *bulk = NULL;
+ struct ptlrpc_bulk_desc *desc;
struct obd_conn conn;
void *tmp1, *tmp2, *end2;
- struct niobuf *nb, *dst, *res = NULL;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
- niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = body->data;
conn.oc_id = body->connid;
ost_unpack_ioo(&tmp1, &ioo);
if (tmp2 + ioo->ioo_bufcnt > end2) {
LBUG();
- rc = -EFAULT;
- break;
+ GOTO(out, rc = -EFAULT);
}
for (j = 0; j < ioo->ioo_bufcnt; j++)
- ost_unpack_niobuf(&tmp2, &nb);
+ ost_unpack_niobuf(&tmp2, &remote_nb);
}
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
- OBD_ALLOC(res, sizeof(*res) * niocount);
- if (res == NULL)
+ OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+ if (local_nb == NULL)
RETURN(-ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, res);
+ tmp1, niocount, tmp2, local_nb);
if (req->rq_status)
- GOTO(out, 0);
+ GOTO(out_local, 0);
- for (i = 0; i < niocount; i++) {
- bulk = ptlrpc_prep_bulk(req->rq_connection);
- if (bulk == NULL) {
- CERROR("cannot alloc bulk desc\n");
- GOTO(out, rc = -ENOMEM);
- }
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(out_local, rc = -ENOMEM);
+ desc->b_portal = OST_BULK_PORTAL;
- dst = &(((struct niobuf *)tmp2)[i]);
- bulk->b_xid = dst->xid;
- bulk->b_buf = (void *)(unsigned long)res[i].addr;
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+ bulk->b_xid = remote_nb->xid;
+ bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
bulk->b_buflen = PAGE_SIZE;
- rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
- if (rc)
- GOTO(out, rc);
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
+ }
- if (bulk->b_flags & PTL_RPC_FL_INTR)
- GOTO(out, 0);
+ rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(out_bulk, rc);
- ptlrpc_free_bulk(bulk);
- }
- bulk = NULL;
+ ptlrpc_free_bulk(desc);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_commitrw(cmd, &conn, objcount,
- tmp1, niocount, res);
+ tmp1, niocount, local_nb);
- EXIT;
- out:
- if (res != NULL)
- OBD_FREE(res, sizeof(*res) * niocount);
- if (bulk != NULL)
- ptlrpc_free_bulk(bulk);
- if (bulk_vec != NULL) {
- for (i = 0; i < niocount; i++)
- if (bulk_vec[i] != NULL)
- ptlrpc_free_bulk(bulk_vec[i]);
- OBD_FREE(bulk_vec, niocount * sizeof(*bulk_vec));
- }
+ RETURN(rc);
+ out_bulk:
+ ptlrpc_free_bulk(desc);
+ out_local:
+ if (local_nb != NULL)
+ OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
return 0;
}
static int ost_commit_page(struct obd_conn *conn, struct page *page)
{
struct obd_ioobj obj;
- struct niobuf buf;
+ struct niobuf_local buf;
int rc;
ENTRY;
RETURN(rc);
}
-static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
{
+ void *journal_save;
int rc;
-
ENTRY;
- rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
+ /* Restore the filesystem journal context when we do the commit.
+ * This is needed for ext3 and reiserfs, but can't really hurt
+ * other filesystems.
+ */
+ journal_save = current->journal_info;
+ current->journal_info = bulk->b_desc->b_journal_info;
+ CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current,
+ journal_save, bulk->b_desc->b_journal_info);
+ rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
+ current->journal_info = journal_save;
+ CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current,
+ journal_save);
if (rc)
CERROR("ost_commit_page failed: %d\n", rc);
RETURN(rc);
}
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+{
+ ptlrpc_free_bulk(desc);
+
+ return 0;
+}
+
static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
{
+ struct ptlrpc_bulk_desc *desc;
struct obd_conn conn;
- struct niobuf *nb, *res;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb, *lnb;
struct obd_ioobj *ioo;
struct ost_body *body;
int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
- niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = body->data;
conn.oc_id = body->connid;
break;
}
for (j = 0; j < ioo->ioo_bufcnt; j++)
- ost_unpack_niobuf((void *)&tmp2, &nb);
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
}
- size[1] = niocount * sizeof(*nb);
+ size[1] = niocount * sizeof(*remote_nb);
rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
- RETURN(rc);
+ GOTO(fail, rc);
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
- res = lustre_msg_buf(req->rq_repmsg, 1);
+ OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+ if (local_nb == NULL)
+ GOTO(fail, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, res);
-
+ tmp1, niocount, tmp2, local_nb);
if (req->rq_status)
- GOTO(out, 0);
+ GOTO(success, 0);
+
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(fail_preprw, rc = -ENOMEM);
+ desc->b_cb = ost_brw_write_finished_cb;
+ desc->b_portal = OSC_BULK_PORTAL;
+ memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+ /* Save journal context for commit callbacks */
+ CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current,
+ current->journal_info);
+ desc->b_journal_info = current->journal_info;
- for (i = 0; i < niocount; i++, res++) {
- struct ptlrpc_bulk_desc *bulk;
+ for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+ struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk(req->rq_connection);
+ bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
- GOTO(out, rc = -ENOMEM);
+ GOTO(fail_bulk, rc = -ENOMEM);
spin_lock(&srv->srv_lock);
bulk->b_xid = srv->srv_xid++;
spin_unlock(&srv->srv_lock);
- res->xid = HTON__u32(bulk->b_xid);
-
- bulk->b_buf = (void *)(unsigned long)res->addr;
- bulk->b_cb = ost_brw_write_cb;
- bulk->b_page = res->page;
- memcpy(&(bulk->b_conn), &conn, sizeof(conn));
+ bulk->b_buf = (void *)(unsigned long)lnb->addr;
+ bulk->b_page = lnb->page;
bulk->b_buflen = PAGE_SIZE;
- bulk->b_portal = OSC_BULK_PORTAL;
- rc = ptlrpc_register_bulk(bulk);
- if (rc)
- GOTO(out, rc);
+ bulk->b_cb = ost_brw_write_cb;
+
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+ bulk->b_xid);
}
+ rc = ptlrpc_register_bulk(desc);
+ current->journal_info = NULL; /* kind of scary */
+ if (rc)
+ GOTO(fail_bulk, rc);
+
EXIT;
- out:
- /* FIXME: should we return 'rc' here? */
+ success:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
return 0;
+
+ fail_bulk:
+ ptlrpc_free_bulk(desc);
+ fail_preprw:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ /* FIXME: how do we undo the preprw? */
+ fail:
+ return rc;
}
static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
-#if 0
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
-#endif
RETURN(0);