#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/version.h>
#include <linux/module.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
#include <linux/lustre_dlm.h>
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/workqueue.h>
+#endif
+#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
#include <linux/obd_lov.h>
-
-static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection,
- struct lustre_handle **rconn)
-{
- struct obd_export *export = class_conn2export(conn);
- struct osc_obd *osc = &export->exp_obd->u.osc;
-
- *cl = osc->osc_client;
- *connection = osc->osc_conn;
- *rconn = &export->exp_rconnh;
-}
-
-static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection,
- struct lustre_handle **rconn)
+#include <linux/ctype.h>
+#include <linux/init.h>
+#include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
+#include <linux/lustre_lite.h> /* for ll_i2info */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
+
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- struct obd_export *export = class_conn2export(conn);
- struct osc_obd *osc = &export->exp_obd->u.osc;
-
- *cl = osc->osc_ldlm_client;
- *connection = osc->osc_conn;
- *rconn = &export->exp_rconnh;
-}
-
-static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
- struct osc_obd *osc = &obd->u.osc;
- struct obd_import *import;
struct ptlrpc_request *request;
- char *tmp = osc->osc_target_uuid;
- int rc, size = sizeof(osc->osc_target_uuid);
- ENTRY;
-
- OBD_ALLOC(import, sizeof(*import));
- if (!import)
- RETURN(-ENOMEM);
-
- MOD_INC_USE_COUNT;
- rc = class_connect(conn, obd);
- if (rc)
- RETURN(rc);
-
- request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
- OST_CONNECT, 1, &size, &tmp);
- if (!request)
- GOTO(out_disco, rc = -ENOMEM);
-
- request->rq_level = LUSTRE_CONN_NEW;
- request->rq_replen = lustre_msg_size(0, NULL);
- request->rq_reqmsg->addr = -1;
- /* Sending our local connection info breaks for local connections
- request->rq_reqmsg->addr = conn->addr;
- request->rq_reqmsg->cookie = conn->cookie;
- */
-
- rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
- if (rc) {
- CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
- GOTO(out, rc);
- }
-
- /* XXX eventually maybe more refinement */
- osc->osc_conn->c_level = LUSTRE_CONN_FULL;
-
- class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
-
- EXIT;
- out:
- ptlrpc_free_req(request);
- out_disco:
- if (rc) {
- class_disconnect(conn);
- MOD_DEC_USE_COUNT;
- }
- return rc;
-}
-
-static int osc_disconnect(struct lustre_handle *conn)
-{
- struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- int rc;
- ENTRY;
-
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_DISCONNECT, 0, NULL, NULL);
- if (!request)
- RETURN(-ENOMEM);
- request->rq_replen = lustre_msg_size(0, NULL);
-
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
- rc = class_disconnect(conn);
- if (!rc)
- MOD_DEC_USE_COUNT;
-
- out:
- ptlrpc_free_req(request);
- return rc;
-}
-
-static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
-{
- struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_GETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_valid = ~0;
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_OPEN, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_CLOSE, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
- oa->o_id = md->lmd_object_id;
- oa->o_mode = S_IFREG;
- oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
-static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_SETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- GOTO(out, rc);
- out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
struct lov_stripe_md **ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
+ struct lov_stripe_md *lsm;
int rc, size = sizeof(*body);
ENTRY;
- if (!oa) {
- CERROR("oa NULL\n");
- RETURN(-EINVAL);
- }
-
- if (!ea) {
- LBUG();
- }
+ LASSERT(oa);
+ LASSERT(ea);
- if (!*ea) {
- OBD_ALLOC(*ea, oa->o_easize);
- if (!*ea)
+ lsm = *ea;
+ if (!lsm) {
+ // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
+ OBD_ALLOC(lsm, oa->o_easize);
+ if (!lsm)
RETURN(-ENOMEM);
- (*ea)->lmd_size = oa->o_easize;
+ lsm->lsm_mds_easize = oa->o_easize;
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_CREATE, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
+ NULL);
if (!request)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out, rc);
+ GOTO(out_req, rc);
body = lustre_msg_buf(request->rq_repmsg, 0);
memcpy(oa, &body->oa, sizeof(*oa));
- (*ea)->lmd_object_id = oa->o_id;
- (*ea)->lmd_stripe_count = 1;
+ lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_stripe_count = 0;
+ *ea = lsm;
EXIT;
- out:
- ptlrpc_free_req(request);
+out_req:
+ ptlrpc_req_finished(request);
+out:
+ if (rc && !*ea)
+ OBD_FREE(lsm, oa->o_easize);
return rc;
}
static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md, obd_size count,
- obd_off offset)
+ struct lov_stripe_md *md, obd_size start,
+ obd_size end)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_PUNCH, 1, &size, NULL);
+
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness, valid
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_blocks = count;
- body->oa.o_valid |= OBD_MD_FLBLOCKS;
+
+ /* overload the size and blocks fields in the oa with start/end */
+ body->oa.o_size = HTON__u64(start);
+ body->oa.o_blocks = HTON__u64(end);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
struct lov_stripe_md *ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_DESTROY, 1, &size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_valid = ~0;
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
struct osc_brw_cb_data {
- bulk_callback_t callback;
+ brw_callback_t callback;
void *cb_data;
void *obd_data;
size_t obd_size;
};
-static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
+/* Our bulk-unmapping bottom half. */
+static void unmap_and_decref_bulk_desc(void *data)
{
- struct list_head *tmp, *next;
- struct osc_brw_cb_data *cb_data = data;
+ struct ptlrpc_bulk_desc *desc = data;
+ struct list_head *tmp;
ENTRY;
- if (desc->b_flags & PTL_RPC_FL_INTR)
- CERROR("got signal\n");
-
/* This feels wrong to me. */
- list_for_each_safe(tmp, next, &desc->b_page_list) {
+ list_for_each(tmp, &desc->bd_page_list) {
struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
+
+ kunmap(bulk->bp_page);
+ }
+
+ ptlrpc_bulk_decref(desc);
+ EXIT;
+}
+
+static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+ struct osc_brw_cb_data *cb_data = data;
+ int err = 0;
+ ENTRY;
- kunmap(bulk->b_page);
+ if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
+ err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
+ -ETIMEDOUT);
}
if (cb_data->callback)
- (cb_data->callback)(desc, cb_data->cb_data);
+ cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
- ptlrpc_bulk_decref(desc);
if (cb_data->obd_data)
OBD_FREE(cb_data->obd_data, cb_data->obd_size);
OBD_FREE(cb_data, sizeof(*cb_data));
+
+ /* We can't kunmap the desc from interrupt context, so we do it from
+ * the bottom half above. */
+ prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
+ schedule_work(&desc->bd_queue);
+
EXIT;
}
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
- obd_count page_count, struct page **page_array,
- obd_size *count, obd_off *offset, obd_flag *flags,
- bulk_callback_t callback)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+ obd_count page_count, struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct ptlrpc_connection *connection =
+ client_conn2cli(conn)->cl_import.imp_connection;
struct ptlrpc_request *request = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
- int rc, j, size[3] = {sizeof(*body)};
- void *iooptr, *nioptr;
struct osc_brw_cb_data *cb_data = NULL;
+ int rc, size[3] = {sizeof(*body)};
+ void *iooptr, *nioptr;
+ int mapped = 0;
+ __u32 xid;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(struct niobuf_remote);
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_BRW, 3, size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
- body->data = OBD_BRW_READ;
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out_free, rc = -ENOMEM);
- desc->b_portal = OST_BULK_PORTAL;
- desc->b_cb = brw_finish;
+ GOTO(out_req, rc = -ENOMEM);
+ desc->bd_portal = OST_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_desc, rc = -ENOMEM);
+
cb_data->callback = callback;
- desc->b_cb_data = cb_data;
- /* XXX end almost identical to brw_write case */
+ cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+ data->desc = desc;
+ desc->bd_cb_data = cb_data;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
- for (j = 0; j < page_count; j++) {
- struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk_page(desc);
+ ost_pack_ioo(&iooptr, lsm, page_count);
+ /* end almost identical to brw_write case */
+
+ spin_lock(&connection->c_lock);
+ xid = ++connection->c_xid_out; /* single xid for all pages */
+ spin_unlock(&connection->c_lock);
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
GOTO(out_unmap, rc = -ENOMEM);
- spin_lock(&connection->c_lock);
- bulk->b_xid = ++connection->c_xid_out;
- spin_unlock(&connection->c_lock);
+ bulk->bp_xid = xid; /* single xid for all pages */
- bulk->b_buf = kmap(page_array[j]);
- bulk->b_page = page_array[j];
- bulk->b_buflen = PAGE_SIZE;
- ost_pack_niobuf(&nioptr, offset[j], count[j],
- flags[j], bulk->b_xid);
+ bulk->bp_buf = kmap(pga[mapped].pg);
+ bulk->bp_page = pga[mapped].pg;
+ bulk->bp_buflen = PAGE_SIZE;
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, bulk->bp_xid);
}
/*
* Register the bulk first, because the reply could arrive out of order,
* and we want to be ready for the bulk data.
*
- * One reference is released by the bulk callback, the other when
- * we finish sleeping on it (if we don't have a callback).
+ * The reference is released when brw_finish is complete.
+ *
+ * On error, we never do the brw_finish, so we handle all decrefs.
*/
- atomic_set(&desc->b_refcount, callback ? 1 : 2);
- rc = ptlrpc_register_bulk(desc);
- if (rc)
- GOTO(out_unmap, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
+ CERROR("obd_fail_loc=%x, skipping register_bulk\n",
+ OBD_FAIL_OSC_BRW_READ_BULK);
+ } else {
+ rc = ptlrpc_register_bulk(desc);
+ if (rc)
+ GOTO(out_unmap, rc);
+ }
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- if (rc) {
- ptlrpc_bulk_decref(desc);
- GOTO(out_unmap, rc);
- }
- /* Callbacks cause asynchronous handling. */
- if (callback)
- RETURN(0);
+ /*
+ * XXX: If there is an error during the processing of the callback,
+ * such as a timeout in a sleep that it performs, brw_finish
+ * will never get called, and we'll leak the desc, fail to kunmap
+ * things, cats will live with dogs. One solution would be to
+ * export brw_finish as osc_brw_finish, so that the timeout case
+ * and its kin could call it for proper cleanup. An alternative
+ * would be for an error return from the callback to cause us to
+ * clean up, but that doesn't help the truly async cases (like
+ * LOV), which will immediately return from their PHASE_START
+ * callback, before any such cleanup-requiring error condition can
+ * be detected.
+ */
+ if (rc)
+ GOTO(out_req, rc);
- l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
- ptlrpc_bulk_decref(desc);
- if (desc->b_flags & PTL_RPC_FL_INTR)
- RETURN(-EINTR);
+ /* Callbacks cause asynchronous handling. */
+ rc = callback(data, 0, CB_PHASE_START);
- RETURN(0);
+out_req:
+ ptlrpc_req_finished(request);
+ RETURN(rc);
/* Clean up on error. */
- out_unmap:
- for (j = 0; j < desc->b_page_count; j++)
- kunmap(page_array[j]);
- out_free:
- if (cb_data)
- OBD_FREE(cb_data, sizeof(*cb_data));
- ptlrpc_free_bulk(desc);
- ptlrpc_free_req(request);
- return rc;
+out_unmap:
+ while (mapped-- > 0)
+ kunmap(pga[mapped].pg);
+ OBD_FREE(cb_data, sizeof(*cb_data));
+out_desc:
+ ptlrpc_bulk_decref(desc);
+ goto out_req;
}
-static int osc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md, obd_count page_count,
- struct page **pagearray, obd_size *count,
- obd_off *offset, obd_flag *flags,
- bulk_callback_t callback)
+static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
+ obd_count page_count, struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct ptlrpc_connection *connection =
+ client_conn2cli(conn)->cl_import.imp_connection;
struct ptlrpc_request *request = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
struct osc_brw_cb_data *cb_data = NULL;
int rc, j, size[3] = {sizeof(*body)};
void *iooptr, *nioptr;
+ int mapped = 0;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*remote);
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_BRW, 3, size, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
- body->data = OBD_BRW_WRITE;
-
- OBD_ALLOC(local, page_count * sizeof(*local));
- if (!local)
- GOTO(out_free, rc = -ENOMEM);
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out_free, rc = -ENOMEM);
- desc->b_portal = OSC_BULK_PORTAL;
- desc->b_cb = brw_finish;
+ GOTO(out_req, rc = -ENOMEM);
+ desc->bd_portal = OSC_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_desc, rc = -ENOMEM);
+
cb_data->callback = callback;
- desc->b_cb_data = cb_data;
- /* XXX end almost identical to brw_read case */
- cb_data->obd_data = local;
- cb_data->obd_size = page_count * sizeof(*local);
+ cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+ data->desc = desc;
+ desc->bd_cb_data = cb_data;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
ost_pack_ioo(&iooptr, md, page_count);
- for (j = 0; j < page_count; j++) {
- local[j].addr = kmap(pagearray[j]);
- local[j].offset = offset[j];
- local[j].len = count[j];
- ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
+ /* end almost identical to brw_read case */
+
+ OBD_ALLOC(local, page_count * sizeof(*local));
+ if (!local)
+ GOTO(out_cb, rc = -ENOMEM);
+
+ cb_data->obd_data = local;
+ cb_data->obd_size = page_count * sizeof(*local);
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ local[mapped].addr = kmap(pga[mapped].pg);
+
+ CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
+ "%d ; page %d of %d\n",
+ local[mapped].addr, pga[mapped].pg->flags,
+ page_count(pga[mapped].pg),
+ mapped, page_count - 1);
+
+ local[mapped].offset = pga[mapped].off;
+ local[mapped].len = pga[mapped].count;
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, 0);
}
- size[1] = page_count * sizeof(struct niobuf_remote);
+ size[1] = page_count * sizeof(*remote);
request->rq_replen = lustre_msg_size(2, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (!nioptr)
GOTO(out_unmap, rc = -EINVAL);
- if (request->rq_repmsg->buflens[1] !=
- page_count * sizeof(struct niobuf_remote)) {
+ if (request->rq_repmsg->buflens[1] != size[1]) {
CERROR("buffer length wrong (%d vs. %d)\n",
- request->rq_repmsg->buflens[1],
- page_count * sizeof(struct niobuf_remote));
+ request->rq_repmsg->buflens[1], size[1]);
GOTO(out_unmap, rc = -EINVAL);
}
for (j = 0; j < page_count; j++) {
- struct ptlrpc_bulk_page *page;
+ struct ptlrpc_bulk_page *bulk;
ost_unpack_niobuf(&nioptr, &remote);
- page = ptlrpc_prep_bulk_page(desc);
- if (!page)
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (!bulk)
GOTO(out_unmap, rc = -ENOMEM);
- page->b_buf = (void *)(unsigned long)local[j].addr;
- page->b_buflen = local[j].len;
- page->b_xid = remote->xid;
+ bulk->bp_buf = (void *)(unsigned long)local[j].addr;
+ bulk->bp_buflen = local[j].len;
+ bulk->bp_xid = remote->xid;
+ bulk->bp_page = pga[j].pg;
}
- if (desc->b_page_count != page_count)
+ if (desc->bd_page_count != page_count)
LBUG();
- /*
- * One is released when the bulk is complete, the other when we finish
- * waiting on it. (Callback cases don't sleep, so only one ref for
- * them.)
- */
- atomic_set(&desc->b_refcount, callback ? 1 : 2);
- CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
- atomic_read(&desc->b_refcount));
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
+ GOTO(out_unmap, rc = 0);
+
+ /* Our reference is released when brw_finish is complete. */
rc = ptlrpc_send_bulk(desc);
+
+ /* XXX: Mike, same question as in osc_brw_read. */
if (rc)
- GOTO(out_unmap, rc);
+ GOTO(out_req, rc);
/* Callbacks cause asynchronous handling. */
- if (callback)
- RETURN(0);
+ rc = callback(data, 0, CB_PHASE_START);
- /* If there's no callback function, sleep here until complete. */
- l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- ptlrpc_bulk_decref(desc);
- if (desc->b_flags & PTL_RPC_FL_INTR)
- RETURN(-EINTR);
- RETURN(0);
+out_req:
+ ptlrpc_req_finished(request);
+ RETURN(rc);
/* Clean up on error. */
- out_unmap:
- for (j = 0; j < page_count; j++)
- kunmap(pagearray[j]);
-
- out_free:
- if (cb_data)
- OBD_FREE(cb_data, sizeof(*cb_data));
- if (local)
- OBD_FREE(local, page_count * sizeof(*local));
- ptlrpc_free_bulk(desc);
- ptlrpc_req_finished(request);
- return rc;
+out_unmap:
+ while (mapped-- > 0)
+ kunmap(pga[mapped].pg);
+
+ OBD_FREE(local, page_count * sizeof(*local));
+out_cb:
+ OBD_FREE(cb_data, sizeof(*cb_data));
+out_desc:
+ ptlrpc_bulk_decref(desc);
+ goto out_req;
}
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
- struct page **page_array,
- obd_size *count,
- obd_off *offset,
- obd_flag *flags,
- void *callback)
+ struct brw_page *pga, brw_callback_t callback,
+ struct io_cb_data *data)
{
- if (cmd & OBD_BRW_WRITE)
- return osc_brw_write(conn, md, page_count, page_array, count,
- offset, flags, (bulk_callback_t)callback);
- else
- return osc_brw_read(conn, md, page_count, page_array, count,
- offset, flags, (bulk_callback_t)callback);
+ ENTRY;
+
+ while (page_count) {
+ obd_count pages_per_brw;
+ int rc;
+
+ if (page_count > PTL_MD_MAX_IOV)
+ pages_per_brw = PTL_MD_MAX_IOV;
+ else
+ pages_per_brw = page_count;
+
+ if (cmd & OBD_BRW_WRITE)
+ rc = osc_brw_write(conn, md, pages_per_brw, pga,
+ callback, data);
+ else
+ rc = osc_brw_read(conn, md, pages_per_brw, pga,
+ callback, data);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ page_count -= pages_per_brw;
+ pga += pages_per_brw;
+ }
+ RETURN(0);
}
-static int osc_enqueue(struct lustre_handle *conn,
- struct lustre_handle *parent_lock, __u64 *res_id,
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
+ struct lustre_handle *parent_lock,
__u32 type, void *extentp, int extent_len, __u32 mode,
int *flags, void *callback, void *data, int datalen,
struct lustre_handle *lockh)
{
- struct obd_device *obddev = class_conn2obd(conn);
- struct ptlrpc_connection *connection;
- struct ptlrpc_client *cl;
- struct lustre_handle *rconn;
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
+ struct obd_device *obddev = class_conn2obd(connh);
struct ldlm_extent *extent = extentp;
int rc;
- __u32 mode2;
+ ENTRY;
- /* Filesystem locks are given a bit of special treatment: first we
+ /* Filesystem locks are given a bit of special treatment: if
+ * this is not a file size lock (which has end == -1), we
* fixup the lock to start and end on page boundaries. */
- extent->start &= PAGE_MASK;
- extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ if (extent->end != OBD_OBJECT_EOF) {
+ extent->start &= PAGE_MASK;
+ extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ }
/* Next, search for already existing extent locks that will cover us */
- osc_con2dlmcl(conn, &cl, &connection, &rconn);
rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
sizeof(extent), mode, lockh);
- if (rc == 1) {
+ if (rc == 1)
/* We already have a lock, and it's referenced */
- return 0;
- }
+ RETURN(ELDLM_OK);
- /* Next, search for locks that we can upgrade (if we're trying to write)
- * or are more than we need (if we're trying to read). Because the VFS
- * and page cache already protect us locally, lots of readers/writers
- * can share a single PW lock. */
- if (mode == LCK_PW)
- mode2 = LCK_PR;
- else
- mode2 = LCK_PW;
-
- rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
- sizeof(extent), mode2, lockh);
- if (rc == 1) {
- int flags;
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- ldlm_lock_addref(lockh, mode);
- ldlm_lock_decref(lockh, mode2);
-
- if (mode == LCK_PR)
- return 0;
-
- rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
- if (rc)
- LBUG();
-
- return rc;
+ /* If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too. */
+
+ if (mode == LCK_PR) {
+ rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
+ extent, sizeof(extent), LCK_PW, lockh);
+ if (rc == 1) {
+ /* FIXME: This is not incredibly elegant, but it might
+ * be more elegant than adding another parameter to
+ * lock_match. I want a second opinion. */
+ ldlm_lock_addref(lockh, LCK_PR);
+ ldlm_lock_decref(lockh, LCK_PW);
+
+ RETURN(ELDLM_OK);
+ }
}
- rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
- parent_lock, res_id, type, extent, sizeof(extent),
- mode, flags, callback, data, datalen, lockh);
- return rc;
+ rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
+ res_id, type, extent, sizeof(extent), mode, flags,
+ ldlm_completion_ast, callback, data, datalen,
+ lockh);
+ RETURN(rc);
}
-static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
- struct lustre_handle *lockh)
+static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
+ __u32 mode, struct lustre_handle *lockh)
{
ENTRY;
RETURN(0);
}
-static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int osc_cancel_unused(struct lustre_handle *connh,
+ struct lov_stripe_md *lsm, int flags)
{
- struct obd_ioctl_data* data = buf;
- struct osc_obd *osc = &obddev->u.osc;
- char server_uuid[37];
- int rc;
- ENTRY;
-
- if (data->ioc_inllen1 < 1) {
- CERROR("osc setup requires a TARGET UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen1 > 37) {
- CERROR("osc TARGET UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 < 1) {
- CERROR("osc setup requires a SERVER UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 > 37) {
- CERROR("osc SERVER UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
- memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
- sizeof(server_uuid)));
-
- osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
- if (!osc->osc_conn)
- RETURN(-ENOENT);
-
- obddev->obd_namespace =
- ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
- if (obddev->obd_namespace == NULL)
- GOTO(out_conn, rc = -ENOMEM);
-
- OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
- if (osc->osc_client == NULL)
- GOTO(out_ns, rc = -ENOMEM);
-
- OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- if (osc->osc_ldlm_client == NULL)
- GOTO(out_client, rc = -ENOMEM);
-
- ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- osc->osc_client);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- osc->osc_ldlm_client);
- osc->osc_client->cli_name = "osc";
- osc->osc_ldlm_client->cli_name = "ldlm";
-
- MOD_INC_USE_COUNT;
- RETURN(0);
-
- out_client:
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- out_ns:
- ldlm_namespace_free(obddev->obd_namespace);
- out_conn:
- ptlrpc_put_connection(osc->osc_conn);
- return rc;
-}
-
-static int osc_cleanup(struct obd_device * obddev)
-{
- struct osc_obd *osc = &obddev->u.osc;
-
- ldlm_namespace_free(obddev->obd_namespace);
+ struct obd_device *obddev = class_conn2obd(connh);
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
- ptlrpc_cleanup_client(osc->osc_client);
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- ptlrpc_cleanup_client(osc->osc_ldlm_client);
- OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- ptlrpc_put_connection(osc->osc_conn);
-
- MOD_DEC_USE_COUNT;
- return 0;
+ return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
}
-static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
+static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- struct obd_statfs *osfs;
int rc, size = sizeof(*osfs);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_STATFS, 0, NULL, NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
+ NULL);
if (!request)
RETURN(-ENOMEM);
GOTO(out, rc);
}
- osfs = lustre_msg_buf(request->rq_repmsg, 0);
- obd_statfs_unpack(osfs, sfs);
+ obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
+static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
+ void *karg, void *uarg)
+{
+ struct obd_device *obddev = class_conn2obd(conn);
+ struct obd_ioctl_data *data = karg;
+ int err = 0;
+ ENTRY;
+
+ switch (cmd) {
+ case IOC_LDLM_TEST: {
+ err = ldlm_test(obddev, conn);
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_LDLM_REGRESS_START: {
+ unsigned int numthreads = 1;
+ unsigned int numheld = 10;
+ unsigned int numres = 10;
+ unsigned int numext = 10;
+ char *parse;
+
+ if (data->ioc_inllen1) {
+ parse = data->ioc_inlbuf1;
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numthreads = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numheld = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numres = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ if (*parse != '\0') {
+ while(isspace(*parse)) parse++;
+ numext = simple_strtoul(parse, &parse, 0);
+ while(isspace(*parse)) parse++;
+ }
+ }
+
+ err = ldlm_regression_start(obddev, conn, numthreads,
+ numheld, numres, numext);
+
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_LDLM_REGRESS_STOP: {
+ err = ldlm_regression_stop();
+ CERROR("-- done err %d\n", err);
+ GOTO(out, err);
+ }
+ case IOC_OSC_REGISTER_LOV: {
+ if (obddev->u.cli.cl_containing_lov)
+ GOTO(out, err = -EALREADY);
+ obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
+ GOTO(out, err);
+ }
+
+ default:
+ GOTO(out, err = -ENOTTY);
+ }
+out:
+ return err;
+}
+
+int osc_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ int rc;
+ rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return rc;
+}
+
+int osc_detach(struct obd_device *dev)
+{
+ int rc;
+ rc = lprocfs_dereg_obd(dev);
+ return rc;
+
+}
struct obd_ops osc_obd_ops = {
- o_setup: osc_setup,
- o_cleanup: osc_cleanup,
+ o_attach: osc_attach,
+ o_detach: osc_detach,
+ o_setup: client_obd_setup,
+ o_cleanup: client_obd_cleanup,
o_statfs: osc_statfs,
o_create: osc_create,
o_destroy: osc_destroy,
o_setattr: osc_setattr,
o_open: osc_open,
o_close: osc_close,
- o_connect: osc_connect,
- o_disconnect: osc_disconnect,
+ o_connect: client_obd_connect,
+ o_disconnect: client_obd_disconnect,
o_brw: osc_brw,
o_punch: osc_punch,
o_enqueue: osc_enqueue,
- o_cancel: osc_cancel
+ o_cancel: osc_cancel,
+ o_cancel_unused: osc_cancel_unused,
+ o_iocontrol: osc_iocontrol
};
static int __init osc_init(void)
{
- return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
+ int rc;
+
+ rc = class_register_type(&osc_obd_ops,
+ (lprocfs_vars_t*)status_class_var,
+ LUSTRE_OSC_NAME);
+ if (rc)
+ RETURN(rc);
+ return 0;
+
}
static void __exit osc_exit(void)