#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/version.h>
#include <linux/module.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
#include <linux/lustre_dlm.h>
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/workqueue.h>
+#endif
+#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
#include <linux/obd_lov.h>
#include <linux/lustre_ha.h>
#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
#include <linux/lustre_lite.h> /* for ll_i2info */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
-static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- GOTO(out, rc);
- out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
{
struct ptlrpc_request *request;
struct ost_body *body;
+ struct lov_stripe_md *lsm;
int rc, size = sizeof(*body);
ENTRY;
- if (!oa) {
- CERROR("oa NULL\n");
- RETURN(-EINVAL);
- }
-
- if (!ea) {
- LBUG();
- }
+ LASSERT(oa);
+ LASSERT(ea);
- if (!*ea) {
- OBD_ALLOC(*ea, oa->o_easize);
- if (!*ea)
+ lsm = *ea;
+ if (!lsm) {
+ // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
+ OBD_ALLOC(lsm, oa->o_easize);
+ if (!lsm)
RETURN(-ENOMEM);
- (*ea)->lmd_easize = oa->o_easize;
+ lsm->lsm_mds_easize = oa->o_easize;
}
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
NULL);
if (!request)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out, rc);
+ GOTO(out_req, rc);
body = lustre_msg_buf(request->rq_repmsg, 0);
memcpy(oa, &body->oa, sizeof(*oa));
- (*ea)->lmd_object_id = oa->o_id;
- (*ea)->lmd_stripe_count = 1;
+ lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_stripe_count = 0;
+ *ea = lsm;
EXIT;
- out:
- ptlrpc_free_req(request);
+out_req:
+ ptlrpc_req_finished(request);
+out:
+ if (rc && !*ea)
+ OBD_FREE(lsm, oa->o_easize);
return rc;
}
#warning FIXME: pack only valid fields instead of memcpy, endianness, valid
memcpy(&body->oa, oa, sizeof(*oa));
- /* overload the blocks and size fields in the oa with start/end */
-#warning FIXME: endianness, size=start, blocks=end?
- body->oa.o_blocks = start;
- body->oa.o_size = end;
- body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
+ /* overload the size and blocks fields in the oa with start/end */
+ body->oa.o_size = HTON__u64(start);
+ body->oa.o_blocks = HTON__u64(end);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1, &size,
- NULL);
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
+ &size, NULL);
if (!request)
RETURN(-ENOMEM);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
ENTRY;
/* This feels wrong to me. */
- list_for_each(tmp, &desc->b_page_list) {
+ list_for_each(tmp, &desc->bd_page_list) {
struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
- kunmap(bulk->b_page);
+ kunmap(bulk->bp_page);
}
ptlrpc_bulk_decref(desc);
int err = 0;
ENTRY;
- if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
- err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
+ if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
+ err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
-ETIMEDOUT);
}
if (cb_data->callback)
cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
- OBD_FREE(cb_data->obd_data, cb_data->obd_size);
+ if (cb_data->obd_data)
+ OBD_FREE(cb_data->obd_data, cb_data->obd_size);
OBD_FREE(cb_data, sizeof(*cb_data));
/* We can't kunmap the desc from interrupt context, so we do it from
* the bottom half above. */
- INIT_TQUEUE(&desc->b_queue, 0, 0);
- PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
- schedule_task(&desc->b_queue);
+ prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
+ schedule_work(&desc->bd_queue);
EXIT;
}
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
obd_count page_count, struct brw_page *pga,
brw_callback_t callback, struct io_cb_data *data)
{
desc = ptlrpc_prep_bulk(connection);
if (!desc)
GOTO(out_req, rc = -ENOMEM);
- desc->b_portal = OST_BULK_PORTAL;
- desc->b_cb = brw_finish;
+ desc->bd_portal = OST_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
GOTO(out_desc, rc = -ENOMEM);
cb_data->callback = callback;
cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
data->desc = desc;
- desc->b_cb_data = cb_data;
+ desc->bd_cb_data = cb_data;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(&iooptr, lsm, page_count);
/* end almost identical to brw_write case */
spin_lock(&connection->c_lock);
if (bulk == NULL)
GOTO(out_unmap, rc = -ENOMEM);
- bulk->b_xid = xid; /* single xid for all pages */
+ bulk->bp_xid = xid; /* single xid for all pages */
- bulk->b_buf = kmap(pga[mapped].pg);
- bulk->b_page = pga[mapped].pg;
- bulk->b_buflen = PAGE_SIZE;
+ bulk->bp_buf = kmap(pga[mapped].pg);
+ bulk->bp_page = pga[mapped].pg;
+ bulk->bp_buflen = PAGE_SIZE;
ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
- pga[mapped].flag, bulk->b_xid);
+ pga[mapped].flag, bulk->bp_xid);
}
/*
* such as a timeout in a sleep that it performs, brw_finish
* will never get called, and we'll leak the desc, fail to kunmap
* things, cats will live with dogs. One solution would be to
- * export brw_finish as osc_brw_finish, so that the timeout case and
- * its kin could call it for proper cleanup. An alternative would
- * be for an error return from the callback to cause us to clean up,
- * but that doesn't help the truly async cases (like LOV), which
- * will immediately return from their PHASE_START callback, before
- * any such cleanup-requiring error condition can be detected.
+ * export brw_finish as osc_brw_finish, so that the timeout case
+ * and its kin could call it for proper cleanup. An alternative
+ * would be for an error return from the callback to cause us to
+ * clean up, but that doesn't help the truly async cases (like
+ * LOV), which will immediately return from their PHASE_START
+ * callback, before any such cleanup-requiring error condition can
+ * be detected.
*/
if (rc)
GOTO(out_req, rc);
/* Callbacks cause asynchronous handling. */
- rc = callback(data, 0, CB_PHASE_START);
+ rc = callback(data, 0, CB_PHASE_START);
out_req:
ptlrpc_req_finished(request);
desc = ptlrpc_prep_bulk(connection);
if (!desc)
GOTO(out_req, rc = -ENOMEM);
- desc->b_portal = OSC_BULK_PORTAL;
- desc->b_cb = brw_finish;
+ desc->bd_portal = OSC_BULK_PORTAL;
+ desc->bd_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
GOTO(out_desc, rc = -ENOMEM);
cb_data->callback = callback;
cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
data->desc = desc;
- desc->b_cb_data = cb_data;
+ desc->bd_cb_data = cb_data;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
for (mapped = 0; mapped < page_count; mapped++) {
local[mapped].addr = kmap(pga[mapped].pg);
+
+ CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
+ "%d ; page %d of %d\n",
+ local[mapped].addr, pga[mapped].pg->flags,
+ page_count(pga[mapped].pg),
+ mapped, page_count - 1);
+
local[mapped].offset = pga[mapped].off;
local[mapped].len = pga[mapped].count;
ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
if (!bulk)
GOTO(out_unmap, rc = -ENOMEM);
- bulk->b_buf = (void *)(unsigned long)local[j].addr;
- bulk->b_buflen = local[j].len;
- bulk->b_xid = remote->xid;
- bulk->b_page = pga[j].pg;
+ bulk->bp_buf = (void *)(unsigned long)local[j].addr;
+ bulk->bp_buflen = local[j].len;
+ bulk->bp_xid = remote->xid;
+ bulk->bp_page = pga[j].pg;
}
- if (desc->b_page_count != page_count)
+ if (desc->bd_page_count != page_count)
LBUG();
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
struct brw_page *pga, brw_callback_t callback,
struct io_cb_data *data)
{
- if (cmd & OBD_BRW_WRITE)
- return osc_brw_write(conn, md, page_count, pga, callback, data);
- else
- return osc_brw_read(conn, md, page_count, pga, callback, data);
+ ENTRY;
+
+ while (page_count) {
+ obd_count pages_per_brw;
+ int rc;
+
+ if (page_count > PTL_MD_MAX_IOV)
+ pages_per_brw = PTL_MD_MAX_IOV;
+ else
+ pages_per_brw = page_count;
+
+ if (cmd & OBD_BRW_WRITE)
+ rc = osc_brw_write(conn, md, pages_per_brw, pga,
+ callback, data);
+ else
+ rc = osc_brw_read(conn, md, pages_per_brw, pga,
+ callback, data);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ page_count -= pages_per_brw;
+ pga += pages_per_brw;
+ }
+ RETURN(0);
}
-static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
- struct lustre_handle *parent_lock,
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
+ struct lustre_handle *parent_lock,
__u32 type, void *extentp, int extent_len, __u32 mode,
int *flags, void *callback, void *data, int datalen,
struct lustre_handle *lockh)
{
- __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
struct obd_device *obddev = class_conn2obd(connh);
struct ldlm_extent *extent = extentp;
int rc;
- __u32 mode2;
+ ENTRY;
- /* Filesystem locks are given a bit of special treatment: first we
+ /* Filesystem locks are given a bit of special treatment: if
+ * this is not a file size lock (which has end == -1), we
* fixup the lock to start and end on page boundaries. */
- extent->start &= PAGE_MASK;
- extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ if (extent->end != OBD_OBJECT_EOF) {
+ extent->start &= PAGE_MASK;
+ extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ }
/* Next, search for already existing extent locks that will cover us */
- //osc_con2dlmcl(conn, &cl, &connection, &rconn);
rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
sizeof(extent), mode, lockh);
- if (rc == 1) {
+ if (rc == 1)
/* We already have a lock, and it's referenced */
- return 0;
- }
+ RETURN(ELDLM_OK);
- /* Next, search for locks that we can upgrade (if we're trying to write)
- * or are more than we need (if we're trying to read). Because the VFS
- * and page cache already protect us locally, lots of readers/writers
- * can share a single PW lock. */
- if (mode == LCK_PW)
- mode2 = LCK_PR;
- else
- mode2 = LCK_PW;
-
- rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
- sizeof(extent), mode2, lockh);
- if (rc == 1) {
- int flags;
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- ldlm_lock_addref(lockh, mode);
- ldlm_lock_decref(lockh, mode2);
-
- if (mode == LCK_PR)
- return 0;
-
- rc = ldlm_cli_convert(lockh, mode, &flags);
- if (rc)
- LBUG();
-
- return rc;
+ /* If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too. */
+
+ if (mode == LCK_PR) {
+ rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
+ extent, sizeof(extent), LCK_PW, lockh);
+ if (rc == 1) {
+ /* FIXME: This is not incredibly elegant, but it might
+ * be more elegant than adding another parameter to
+ * lock_match. I want a second opinion. */
+ ldlm_lock_addref(lockh, LCK_PR);
+ ldlm_lock_decref(lockh, LCK_PW);
+
+ RETURN(ELDLM_OK);
+ }
}
- rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
- parent_lock, res_id, type, extent,
- sizeof(extent), mode, flags, ldlm_completion_ast,
- callback, data, datalen, lockh);
- return rc;
+ rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
+ res_id, type, extent, sizeof(extent), mode, flags,
+ ldlm_completion_ast, callback, data, datalen,
+ lockh);
+ RETURN(rc);
}
static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
RETURN(0);
}
-static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
+static int osc_cancel_unused(struct lustre_handle *connh,
+ struct lov_stripe_md *lsm, int flags)
+{
+ struct obd_device *obddev = class_conn2obd(connh);
+ __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
+
+ return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
+}
+
+static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
{
struct ptlrpc_request *request;
- struct obd_statfs *osfs;
int rc, size = sizeof(*osfs);
ENTRY;
GOTO(out, rc);
}
- osfs = lustre_msg_buf(request->rq_repmsg, 0);
- obd_statfs_unpack(osfs, sfs);
+ obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
int err = 0;
ENTRY;
- if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) <
- IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
- CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
- _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
- RETURN(-EINVAL);
- }
-
switch (cmd) {
case IOC_LDLM_TEST: {
err = ldlm_test(obddev, conn);
GOTO(out, err);
}
case IOC_LDLM_REGRESS_START: {
- unsigned int numthreads = 1;
- unsigned int numheld = 10;
- unsigned int numres = 10;
+ unsigned int numthreads = 1;
+ unsigned int numheld = 10;
+ unsigned int numres = 10;
unsigned int numext = 10;
char *parse;
-
+
if (data->ioc_inllen1) {
parse = data->ioc_inlbuf1;
if (*parse != '\0') {
}
}
- err = ldlm_regression_start(obddev, conn, numthreads,
+ err = ldlm_regression_start(obddev, conn, numthreads,
numheld, numres, numext);
CERROR("-- done err %d\n", err);
CERROR("-- done err %d\n", err);
GOTO(out, err);
}
+ case IOC_OSC_REGISTER_LOV: {
+ if (obddev->u.cli.cl_containing_lov)
+ GOTO(out, err = -EALREADY);
+ obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
+ GOTO(out, err);
+ }
+
default:
- GOTO(out, err = -EINVAL);
+ GOTO(out, err = -ENOTTY);
}
out:
return err;
}
+int osc_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ int rc;
+ rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return rc;
+}
+
+int osc_detach(struct obd_device *dev)
+{
+ int rc;
+ rc = lprocfs_dereg_obd(dev);
+ return rc;
+
+}
struct obd_ops osc_obd_ops = {
+ o_attach: osc_attach,
+ o_detach: osc_detach,
o_setup: client_obd_setup,
o_cleanup: client_obd_cleanup,
o_statfs: osc_statfs,
o_punch: osc_punch,
o_enqueue: osc_enqueue,
o_cancel: osc_cancel,
+ o_cancel_unused: osc_cancel_unused,
o_iocontrol: osc_iocontrol
};
static int __init osc_init(void)
{
- return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
+ int rc;
+
+ rc = class_register_type(&osc_obd_ops,
+ (lprocfs_vars_t*)status_class_var,
+ LUSTRE_OSC_NAME);
+ if (rc)
+ RETURN(rc);
+ return 0;
+
}
static void __exit osc_exit(void)