/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/version.h>
#include <linux/module.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
#include <linux/lustre_dlm.h>
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/workqueue.h>
+#endif
+#include <linux/kp30.h>
+#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
-
-static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection)
-{
- struct osc_obd *osc = &conn->oc_dev->u.osc;
- *cl = osc->osc_client;
- *connection = osc->osc_conn;
-}
-
-static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection)
-{
- struct osc_obd *osc = &conn->oc_dev->u.osc;
- *cl = osc->osc_ldlm_client;
- *connection = osc->osc_conn;
-}
-
-static int osc_connect(struct obd_conn *conn)
-{
- struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct ost_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
- if (!request)
- RETURN(-ENOMEM);
-
- request->rq_replen = lustre_msg_size(1, &size);
-
- rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
- if (rc) {
- CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
- GOTO(out, rc);
- }
-
- body = lustre_msg_buf(request->rq_repmsg, 0);
- CDEBUG(D_INODE, "received connid %d\n", body->connid);
-
- conn->oc_id = body->connid;
- EXIT;
- out:
- ptlrpc_free_req(request);
- return rc;
-}
-
-static int osc_disconnect(struct obd_conn *conn)
+#include <linux/obd_lov.h>
+#include <linux/ctype.h>
+#include <linux/init.h>
+#include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
+#include <linux/lustre_lite.h> /* for ll_i2info */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
+
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
- if (!request)
- RETURN(-ENOMEM);
-
- body = lustre_msg_buf(request->rq_reqmsg, 0);
- body->connid = conn->oc_id;
-
- request->rq_replen = lustre_msg_size(1, &size);
-
- rc = ptlrpc_queue_wait(request);
- GOTO(out, rc);
- out:
- ptlrpc_free_req(request);
- return rc;
-}
-
-static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
-{
- struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct ost_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
- body->oa.o_valid = ~0;
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_open(struct obd_conn *conn, struct obdo *oa)
+static int osc_open(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
- if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
- LBUG();
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_close(struct obd_conn *conn, struct obdo *oa)
+static int osc_close(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
+static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- GOTO(out, rc);
- out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_create(struct obd_conn *conn, struct obdo *oa)
+static int osc_create(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md **ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
+ struct lov_stripe_md *lsm;
int rc, size = sizeof(*body);
ENTRY;
- if (!oa) {
- CERROR("oa NULL\n");
- RETURN(-EINVAL);
+ LASSERT(oa);
+ LASSERT(ea);
+
+ lsm = *ea;
+ if (!lsm) {
+ // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
+ OBD_ALLOC(lsm, oa->o_easize);
+ if (!lsm)
+ RETURN(-ENOMEM);
+ lsm->lsm_mds_easize = oa->o_easize;
}
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
+
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
+ NULL);
if (!request)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_valid = ~0;
- body->connid = conn->oc_id;
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out, rc);
+ GOTO(out_req, rc);
body = lustre_msg_buf(request->rq_repmsg, 0);
memcpy(oa, &body->oa, sizeof(*oa));
+ lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_stripe_count = 0;
+ *ea = lsm;
EXIT;
- out:
- ptlrpc_free_req(request);
- return 0;
+out_req:
+ ptlrpc_req_finished(request);
+out:
+ if (rc && !*ea)
+ OBD_FREE(lsm, oa->o_easize);
+ return rc;
}
-static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
- obd_off offset)
+static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md, obd_size start,
+ obd_size end)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
+
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness, valid
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
- body->oa.o_valid = ~0;
- body->oa.o_size = offset;
- body->oa.o_blocks = count;
+
+ /* overload the size and blocks fields in the oa with start/end */
+ body->oa.o_size = HTON__u64(start);
+ body->oa.o_blocks = HTON__u64(end);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
+static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->connid = conn->oc_id;
- body->oa.o_valid = ~0;
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
- return 0;
+ ptlrpc_req_finished(request);
+ return rc;
}
-static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
- struct niobuf_remote *dst, struct niobuf_local *src)
+struct osc_brw_cb_data {
+ brw_callback_t callback;
+ void *cb_data;
+ void *obd_data;
+ size_t obd_size;
+};
+
+/* Our bulk-unmapping bottom half. */
+static void unmap_and_decref_bulk_desc(void *data)
{
- struct ptlrpc_bulk_page *page;
+ struct ptlrpc_bulk_desc *desc = data;
+ struct list_head *tmp;
ENTRY;
- page = ptlrpc_prep_bulk_page(desc);
- if (page == NULL)
- RETURN(-ENOMEM);
+ /* This feels wrong to me. */
+ list_for_each(tmp, &desc->bd_page_list) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
- page->b_buf = (void *)(unsigned long)src->addr;
- page->b_buflen = src->len;
- page->b_xid = dst->xid;
+ kunmap(bulk->bp_page);
+ }
- RETURN(0);
+ ptlrpc_bulk_decref(desc);
+ EXIT;
}
-static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
- struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct ptlrpc_request *request;
+ struct osc_brw_cb_data *cb_data = data;
+ int err = 0;
+ ENTRY;
+
+ if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
+ err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
+ -ETIMEDOUT);
+ }
+
+ if (cb_data->callback)
+ cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
+
+ if (cb_data->obd_data)
+ OBD_FREE(cb_data->obd_data, cb_data->obd_size);
+ OBD_FREE(cb_data, sizeof(*cb_data));
+
+ /* We can't kunmap the desc from interrupt context, so we do it from
+ * the bottom half above. */
+ prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
+ schedule_work(&desc->bd_queue);
+
+ EXIT;
+}
+
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+ obd_count page_count, struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *data)
+{
+ struct ptlrpc_connection *connection =
+ client_conn2cli(conn)->cl_import.imp_connection;
+ struct ptlrpc_request *request = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
- struct list_head *tmp;
- int pages, rc, i, j, size[3] = {sizeof(*body)};
- void *ptr1, *ptr2;
- struct ptlrpc_bulk_desc *desc;
+ struct osc_brw_cb_data *cb_data = NULL;
+ int rc, size[3] = {sizeof(*body)};
+ void *iooptr, *nioptr;
+ int mapped = 0;
+ __u32 xid;
ENTRY;
- size[1] = num_oa * sizeof(struct obd_ioobj);
- pages = 0;
- for (i = 0; i < num_oa; i++)
- pages += oa_bufs[i];
- size[2] = pages * sizeof(struct niobuf_remote);
+ size[1] = sizeof(struct obd_ioobj);
+ size[2] = page_count * sizeof(struct niobuf_remote);
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
+ NULL);
if (!request)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
- body->data = OBD_BRW_READ;
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out2, rc = -ENOMEM);
- desc->b_portal = OST_BULK_PORTAL;
-
- ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
- ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
- for (pages = 0, i = 0; i < num_oa; i++) {
- ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
- for (j = 0; j < oa_bufs[i]; j++, pages++) {
- struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out3, rc = -ENOMEM);
-
- spin_lock(&connection->c_lock);
- bulk->b_xid = ++connection->c_xid_out;
- spin_unlock(&connection->c_lock);
-
- bulk->b_buf = kmap(buf[pages]);
- bulk->b_page = buf[pages];
- bulk->b_buflen = PAGE_SIZE;
- ost_pack_niobuf(&ptr2, offset[pages], count[pages],
- flags[pages], bulk->b_xid);
- }
+ GOTO(out_req, rc = -ENOMEM);
+ desc->bd_portal = OST_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ if (!cb_data)
+ GOTO(out_desc, rc = -ENOMEM);
+
+ cb_data->callback = callback;
+ cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+ data->desc = desc;
+ desc->bd_cb_data = cb_data;
+
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+ ost_pack_ioo(&iooptr, lsm, page_count);
+ /* end almost identical to brw_write case */
+
+ spin_lock(&connection->c_lock);
+ xid = ++connection->c_xid_out; /* single xid for all pages */
+ spin_unlock(&connection->c_lock);
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(out_unmap, rc = -ENOMEM);
+
+ bulk->bp_xid = xid; /* single xid for all pages */
+
+ bulk->bp_buf = kmap(pga[mapped].pg);
+ bulk->bp_page = pga[mapped].pg;
+ bulk->bp_buflen = PAGE_SIZE;
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, bulk->bp_xid);
}
- rc = ptlrpc_register_bulk(desc);
- if (rc)
- GOTO(out3, rc);
+ /*
+ * Register the bulk first, because the reply could arrive out of order,
+ * and we want to be ready for the bulk data.
+ *
+ * The reference is released when brw_finish is complete.
+ *
+ * On error, we never do the brw_finish, so we handle all decrefs.
+ */
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
+ CERROR("obd_fail_loc=%x, skipping register_bulk\n",
+ OBD_FAIL_OSC_BRW_READ_BULK);
+ } else {
+ rc = ptlrpc_register_bulk(desc);
+ if (rc)
+ GOTO(out_unmap, rc);
+ }
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- if (rc)
- ptlrpc_abort_bulk(desc);
- GOTO(out3, rc);
- out3:
- list_for_each(tmp, &desc->b_page_list) {
- struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- if (bulk->b_buf != NULL)
- kunmap(bulk->b_buf);
- }
- ptlrpc_free_bulk(desc);
- out2:
- ptlrpc_free_req(request);
- out:
- return rc;
+ /*
+ * XXX: If there is an error during the processing of the callback,
+ * such as a timeout in a sleep that it performs, brw_finish
+ * will never get called, and we'll leak the desc, fail to kunmap
+ * things, cats will live with dogs. One solution would be to
+ * export brw_finish as osc_brw_finish, so that the timeout case
+ * and its kin could call it for proper cleanup. An alternative
+ * would be for an error return from the callback to cause us to
+ * clean up, but that doesn't help the truly async cases (like
+ * LOV), which will immediately return from their PHASE_START
+ * callback, before any such cleanup-requiring error condition can
+ * be detected.
+ */
+ if (rc)
+ GOTO(out_req, rc);
+
+ /* Callbacks cause asynchronous handling. */
+ rc = callback(data, 0, CB_PHASE_START);
+
+out_req:
+ ptlrpc_req_finished(request);
+ RETURN(rc);
+
+ /* Clean up on error. */
+out_unmap:
+ while (mapped-- > 0)
+ kunmap(pga[mapped].pg);
+ OBD_FREE(cb_data, sizeof(*cb_data));
+out_desc:
+ ptlrpc_bulk_decref(desc);
+ goto out_req;
}
-static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
- struct obdo **oa, obd_count *oa_bufs,
- struct page **buf, obd_size *count, obd_off *offset,
- obd_flag *flags)
+static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
+ obd_count page_count, struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct ptlrpc_request *request;
- struct ptlrpc_bulk_desc *desc;
- struct obd_ioobj ioo;
+ struct ptlrpc_connection *connection =
+ client_conn2cli(conn)->cl_import.imp_connection;
+ struct ptlrpc_request *request = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
- struct niobuf_local *local;
+ struct niobuf_local *local = NULL;
struct niobuf_remote *remote;
- long pages;
- int rc, i, j, size[3] = {sizeof(*body)};
- void *ptr1, *ptr2;
+ struct osc_brw_cb_data *cb_data = NULL;
+ int rc, j, size[3] = {sizeof(*body)};
+ void *iooptr, *nioptr;
+ int mapped = 0;
ENTRY;
- size[1] = num_oa * sizeof(ioo);
- pages = 0;
- for (i = 0; i < num_oa; i++)
- pages += oa_bufs[i];
- size[2] = pages * sizeof(*remote);
+ size[1] = sizeof(struct obd_ioobj);
+ size[2] = page_count * sizeof(*remote);
- OBD_ALLOC(local, pages * sizeof(*local));
- if (local == NULL)
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
+ NULL);
+ if (!request)
RETURN(-ENOMEM);
- osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
- if (!request)
- GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
- body->data = OBD_BRW_WRITE;
-
- ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
- ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
- for (pages = 0, i = 0; i < num_oa; i++) {
- ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
- for (j = 0; j < oa_bufs[i]; j++, pages++) {
- local[pages].addr = (__u64)(long)kmap(buf[pages]);
- local[pages].offset = offset[pages];
- local[pages].len = count[pages];
- ost_pack_niobuf(&ptr2, offset[pages], count[pages],
- flags[pages], 0);
- }
+
+ desc = ptlrpc_prep_bulk(connection);
+ if (!desc)
+ GOTO(out_req, rc = -ENOMEM);
+ desc->bd_portal = OSC_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ if (!cb_data)
+ GOTO(out_desc, rc = -ENOMEM);
+
+ cb_data->callback = callback;
+ cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+ data->desc = desc;
+ desc->bd_cb_data = cb_data;
+
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+ ost_pack_ioo(&iooptr, md, page_count);
+ /* end almost identical to brw_read case */
+
+ OBD_ALLOC(local, page_count * sizeof(*local));
+ if (!local)
+ GOTO(out_cb, rc = -ENOMEM);
+
+ cb_data->obd_data = local;
+ cb_data->obd_size = page_count * sizeof(*local);
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ local[mapped].addr = kmap(pga[mapped].pg);
+
+ CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
+ "%d ; page %d of %d\n",
+ local[mapped].addr, pga[mapped].pg->flags,
+ page_count(pga[mapped].pg),
+ mapped, page_count - 1);
+
+ local[mapped].offset = pga[mapped].off;
+ local[mapped].len = pga[mapped].count;
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, 0);
}
- size[1] = pages * sizeof(struct niobuf_remote);
+ size[1] = page_count * sizeof(*remote);
request->rq_replen = lustre_msg_size(2, size);
-
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out2, rc);
-
- ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
- if (ptr2 == NULL)
- GOTO(out2, rc = -EINVAL);
-
- if (request->rq_repmsg->buflens[1] !=
- pages * sizeof(struct niobuf_remote)) {
- CERROR("buffer length wrong (%d vs. %ld)\n",
- request->rq_repmsg->buflens[1],
- pages * sizeof(struct niobuf_remote));
- GOTO(out2, rc = -EINVAL);
+ GOTO(out_unmap, rc);
+
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ if (!nioptr)
+ GOTO(out_unmap, rc = -EINVAL);
+
+ if (request->rq_repmsg->buflens[1] != size[1]) {
+ CERROR("buffer length wrong (%d vs. %d)\n",
+ request->rq_repmsg->buflens[1], size[1]);
+ GOTO(out_unmap, rc = -EINVAL);
}
- desc = ptlrpc_prep_bulk(connection);
- desc->b_portal = OSC_BULK_PORTAL;
-
- for (pages = 0, i = 0; i < num_oa; i++) {
- for (j = 0; j < oa_bufs[i]; j++, pages++) {
- ost_unpack_niobuf(&ptr2, &remote);
- rc = osc_sendpage(desc, remote, &local[pages]);
- if (rc)
- GOTO(out3, rc);
- }
+ for (j = 0; j < page_count; j++) {
+ struct ptlrpc_bulk_page *bulk;
+
+ ost_unpack_niobuf(&nioptr, &remote);
+
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (!bulk)
+ GOTO(out_unmap, rc = -ENOMEM);
+
+ bulk->bp_buf = (void *)(unsigned long)local[j].addr;
+ bulk->bp_buflen = local[j].len;
+ bulk->bp_xid = remote->xid;
+ bulk->bp_page = pga[j].pg;
}
+ if (desc->bd_page_count != page_count)
+ LBUG();
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
+ GOTO(out_unmap, rc = 0);
+
+ /* Our reference is released when brw_finish is complete. */
rc = ptlrpc_send_bulk(desc);
- GOTO(out3, rc);
-
- out3:
- ptlrpc_free_bulk(desc);
- out2:
- ptlrpc_free_req(request);
- for (pages = 0, i = 0; i < num_oa; i++)
- for (j = 0; j < oa_bufs[i]; j++, pages++)
- kunmap(buf[pages]);
- out:
- OBD_FREE(local, pages * sizeof(*local));
- return rc;
+ /* XXX: Mike, same question as in osc_brw_read. */
+ if (rc)
+ GOTO(out_req, rc);
+
+ /* Callbacks cause asynchronous handling. */
+ rc = callback(data, 0, CB_PHASE_START);
+
+out_req:
+ ptlrpc_req_finished(request);
+ RETURN(rc);
+
+ /* Clean up on error. */
+out_unmap:
+ while (mapped-- > 0)
+ kunmap(pga[mapped].pg);
+
+ OBD_FREE(local, page_count * sizeof(*local));
+out_cb:
+ OBD_FREE(cb_data, sizeof(*cb_data));
+out_desc:
+ ptlrpc_bulk_decref(desc);
+ goto out_req;
}
-static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
- struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+static int osc_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *md, obd_count page_count,
+ struct brw_page *pga, brw_callback_t callback,
+ struct io_cb_data *data)
{
- if (rw == OBD_BRW_READ)
- return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
- else
- return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
+ ENTRY;
+
+ while (page_count) {
+ obd_count pages_per_brw;
+ int rc;
+
+ if (page_count > PTL_MD_MAX_IOV)
+ pages_per_brw = PTL_MD_MAX_IOV;
+ else
+ pages_per_brw = page_count;
+
+ if (cmd & OBD_BRW_WRITE)
+ rc = osc_brw_write(conn, md, pages_per_brw, pga,
+ callback, data);
+ else
+ rc = osc_brw_read(conn, md, pages_per_brw, pga,
+ callback, data);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ page_count -= pages_per_brw;
+ pga += pages_per_brw;
+ }
+ RETURN(0);
}
-static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
- struct ldlm_handle *parent_lock, __u64 *res_id,
- __u32 type, struct ldlm_extent *extent, __u32 mode,
- int *flags, void *data, int datalen,
- struct ldlm_handle *lockh)
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
+ struct lustre_handle *parent_lock,
+ __u32 type, void *extentp, int extent_len, __u32 mode,
+ int *flags, void *callback, void *data, int datalen,
+ struct lustre_handle *lockh)
{
- struct ptlrpc_connection *conn;
- struct ptlrpc_client *cl;
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
+ struct obd_device *obddev = class_conn2obd(connh);
+ struct ldlm_extent *extent = extentp;
int rc;
- __u32 mode2;
+ ENTRY;
- /* Filesystem locks are given a bit of special treatment: first we
+ /* Filesystem locks are given a bit of special treatment: if
+ * this is not a file size lock (which has end == -1), we
* fixup the lock to start and end on page boundaries. */
- extent->start &= PAGE_MASK;
- extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ if (extent->end != OBD_OBJECT_EOF) {
+ extent->start &= PAGE_MASK;
+ extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ }
/* Next, search for already existing extent locks that will cover us */
- osc_con2dlmcl(oconn, &cl, &conn);
- rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
- if (rc == 1) {
+ rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
+ sizeof(extent), mode, lockh);
+ if (rc == 1)
/* We already have a lock, and it's referenced */
- return 0;
- }
-
- /* Next, search for locks that we can upgrade (if we're trying to write)
- * or are more than we need (if we're trying to read). Because the VFS
- * and page cache already protect us locally, lots of readers/writers
- * can share a single PW lock. */
- if (mode == LCK_PW)
- mode2 = LCK_PR;
- else
- mode2 = LCK_PW;
-
- rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
- if (rc == 1) {
- int flags;
- struct ldlm_lock *lock = ldlm_handle2object(lockh);
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- ldlm_lock_addref(lock, mode);
- ldlm_lock_decref(lock, mode2);
-
- if (mode == LCK_PR)
- return 0;
-
- rc = ldlm_cli_convert(cl, lockh, type, &flags);
- if (rc)
- LBUG();
-
- return rc;
+ RETURN(ELDLM_OK);
+
+ /* If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too. */
+
+ if (mode == LCK_PR) {
+ rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
+ extent, sizeof(extent), LCK_PW, lockh);
+ if (rc == 1) {
+ /* FIXME: This is not incredibly elegant, but it might
+ * be more elegant than adding another parameter to
+ * lock_match. I want a second opinion. */
+ ldlm_lock_addref(lockh, LCK_PR);
+ ldlm_lock_decref(lockh, LCK_PW);
+
+ RETURN(ELDLM_OK);
+ }
}
- rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
- extent, mode, flags, data, datalen, lockh);
- return rc;
+ rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
+ res_id, type, extent, sizeof(extent), mode, flags,
+ ldlm_completion_ast, callback, data, datalen,
+ lockh);
+ RETURN(rc);
}
-static int osc_cancel(struct obd_conn *oconn, __u32 mode,
- struct ldlm_handle *lockh)
+static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
+ __u32 mode, struct lustre_handle *lockh)
{
- struct ldlm_lock *lock;
ENTRY;
- lock = ldlm_handle2object(lockh);
- ldlm_lock_decref(lock, mode);
+ ldlm_lock_decref(lockh, mode);
RETURN(0);
}
-static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int osc_cancel_unused(struct lustre_handle *connh,
+ struct lov_stripe_md *lsm, int flags)
{
- struct osc_obd *osc = &obddev->u.osc;
- int rc;
- ENTRY;
+ struct obd_device *obddev = class_conn2obd(connh);
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
- osc->osc_conn = ptlrpc_uuid_to_connection("ost");
- if (!osc->osc_conn)
- RETURN(-EINVAL);
+ return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
+}
+
+static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+{
+ struct ptlrpc_request *request;
+ int rc, size = sizeof(*osfs);
+ ENTRY;
- OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
- if (osc->osc_client == NULL)
- GOTO(out_conn, rc = -ENOMEM);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
+ NULL);
+ if (!request)
+ RETURN(-ENOMEM);
- OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- if (osc->osc_ldlm_client == NULL)
- GOTO(out_client, rc = -ENOMEM);
+ request->rq_replen = lustre_msg_size(1, &size);
- ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- osc->osc_client);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- osc->osc_ldlm_client);
- osc->osc_client->cli_name = "osc";
- osc->osc_ldlm_client->cli_name = "ldlm";
+ rc = ptlrpc_queue_wait(request);
+ rc = ptlrpc_check_status(request, rc);
+ if (rc) {
+ CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
+ GOTO(out, rc);
+ }
- MOD_INC_USE_COUNT;
- RETURN(0);
+ obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
- out_client:
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- out_conn:
- ptlrpc_put_connection(osc->osc_conn);
+ EXIT;
+ out:
+ ptlrpc_req_finished(request);
return rc;
}
-static int osc_cleanup(struct obd_device * obddev)
+static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
+ void *karg, void *uarg)
{
- struct osc_obd *osc = &obddev->u.osc;
+ struct obd_device *obddev = class_conn2obd(conn);
+ struct obd_ioctl_data *data = karg;
+ int err = 0;
+ ENTRY;
- ptlrpc_cleanup_client(osc->osc_client);
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- ptlrpc_cleanup_client(osc->osc_ldlm_client);
- OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- ptlrpc_put_connection(osc->osc_conn);
+ switch (cmd) {
+ case IOC_LDLM_TEST: {
+ err = ldlm_test(obddev, conn);
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_LDLM_REGRESS_START: {
+ unsigned int numthreads = 1;
+ unsigned int numheld = 10;
+ unsigned int numres = 10;
+ unsigned int numext = 10;
+ char *parse;
+
+ if (data->ioc_inllen1) {
+ parse = data->ioc_inlbuf1;
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numthreads = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numheld = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numres = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numext = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ }
- MOD_DEC_USE_COUNT;
- return 0;
+ err = ldlm_regression_start(obddev, conn, numthreads,
+ numheld, numres, numext);
+
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_LDLM_REGRESS_STOP: {
+ err = ldlm_regression_stop();
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_OSC_REGISTER_LOV: {
+ if (obddev->u.cli.cl_containing_lov)
+ GOTO(out, err = -EALREADY);
+ obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
+ GOTO(out, err);
+ }
+
+ default:
+ GOTO(out, err = -ENOTTY);
+ }
+out:
+ return err;
}
+int osc_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ int rc;
+ rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return rc;
+}
+
+int osc_detach(struct obd_device *dev)
+{
+ int rc;
+ rc = lprocfs_dereg_obd(dev);
+ return rc;
+
+}
struct obd_ops osc_obd_ops = {
- o_setup: osc_setup,
- o_cleanup: osc_cleanup,
- o_create: osc_create,
- o_destroy: osc_destroy,
- o_getattr: osc_getattr,
- o_setattr: osc_setattr,
- o_open: osc_open,
- o_close: osc_close,
- o_connect: osc_connect,
- o_disconnect: osc_disconnect,
- o_brw: osc_brw,
- o_punch: osc_punch,
- o_enqueue: osc_enqueue,
- o_cancel: osc_cancel
+ o_attach: osc_attach,
+ o_detach: osc_detach,
+ o_setup: client_obd_setup,
+ o_cleanup: client_obd_cleanup,
+ o_statfs: osc_statfs,
+ o_create: osc_create,
+ o_destroy: osc_destroy,
+ o_getattr: osc_getattr,
+ o_setattr: osc_setattr,
+ o_open: osc_open,
+ o_close: osc_close,
+ o_connect: client_obd_connect,
+ o_disconnect: client_obd_disconnect,
+ o_brw: osc_brw,
+ o_punch: osc_punch,
+ o_enqueue: osc_enqueue,
+ o_cancel: osc_cancel,
+ o_cancel_unused: osc_cancel_unused,
+ o_iocontrol: osc_iocontrol
};
static int __init osc_init(void)
{
- obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
+ int rc;
+
+ rc = class_register_type(&osc_obd_ops,
+ (lprocfs_vars_t*)status_class_var,
+ LUSTRE_OSC_NAME);
+ if (rc)
+ RETURN(rc);
return 0;
+
}
static void __exit osc_exit(void)
{
- obd_unregister_type(LUSTRE_OSC_NAME);
+ class_unregister_type(LUSTRE_OSC_NAME);
}
-MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
MODULE_LICENSE("GPL");