#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/version.h>
#include <linux/module.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
#include <linux/lustre_dlm.h>
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/workqueue.h>
+#endif
+#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
#include <linux/obd_lov.h>
#include <linux/lustre_ha.h>
#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
#include <linux/lustre_lite.h> /* for ll_i2info */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
+
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md)
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
{
struct ptlrpc_request *request;
struct ost_body *body;
+ struct lov_stripe_md *lsm;
int rc, size = sizeof(*body);
ENTRY;
- if (!oa) {
- CERROR("oa NULL\n");
- RETURN(-EINVAL);
- }
-
- if (!ea) {
- LBUG();
- }
+ LASSERT(oa);
+ LASSERT(ea);
- if (!*ea) {
+ lsm = *ea;
+ if (!lsm) {
// XXX check oa->o_valid & OBD_MD_FLEASIZE first...
- OBD_ALLOC(*ea, oa->o_easize);
- if (!*ea)
+ OBD_ALLOC(lsm, oa->o_easize);
+ if (!lsm)
RETURN(-ENOMEM);
- (*ea)->lsm_mds_easize = oa->o_easize;
+ lsm->lsm_mds_easize = oa->o_easize;
}
request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
NULL);
if (!request)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out, rc);
+ GOTO(out_req, rc);
body = lustre_msg_buf(request->rq_repmsg, 0);
memcpy(oa, &body->oa, sizeof(*oa));
- (*ea)->lsm_object_id = oa->o_id;
- (*ea)->lsm_stripe_count = 0;
+ lsm->lsm_object_id = oa->o_id;
+ lsm->lsm_stripe_count = 0;
+ *ea = lsm;
EXIT;
- out:
- ptlrpc_free_req(request);
+out_req:
+ ptlrpc_req_finished(request);
+out:
+ if (rc && !*ea)
+ OBD_FREE(lsm, oa->o_easize);
return rc;
}
#warning FIXME: pack only valid fields instead of memcpy, endianness, valid
memcpy(&body->oa, oa, sizeof(*oa));
- /* overload the blocks and size fields in the oa with start/end */
-#warning FIXME: endianness, size=start, blocks=end?
- body->oa.o_blocks = start;
- body->oa.o_size = end;
- body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
+ /* overload the size and blocks fields in the oa with start/end */
+ body->oa.o_size = HTON__u64(start);
+ body->oa.o_blocks = HTON__u64(end);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
request->rq_replen = lustre_msg_size(1, &size);
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
if (cb_data->callback)
cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
- OBD_FREE(cb_data->obd_data, cb_data->obd_size);
+ if (cb_data->obd_data)
+ OBD_FREE(cb_data->obd_data, cb_data->obd_size);
OBD_FREE(cb_data, sizeof(*cb_data));
/* We can't kunmap the desc from interrupt context, so we do it from
* the bottom half above. */
- INIT_TQUEUE(&desc->bd_queue, 0, 0);
- PREPARE_TQUEUE(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
- schedule_task(&desc->bd_queue);
+ prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
+ schedule_work(&desc->bd_queue);
EXIT;
}
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
obd_count page_count, struct brw_page *pga,
brw_callback_t callback, struct io_cb_data *data)
{
cb_data->callback = callback;
cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
data->desc = desc;
desc->bd_cb_data = cb_data;
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
- ost_pack_ioo(&iooptr, md, page_count);
+ ost_pack_ioo(&iooptr, lsm, page_count);
/* end almost identical to brw_write case */
spin_lock(&connection->c_lock);
cb_data->callback = callback;
cb_data->cb_data = data;
+ CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
data->desc = desc;
desc->bd_cb_data = cb_data;
CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
"%d ; page %d of %d\n",
local[mapped].addr, pga[mapped].pg->flags,
- page_count(pga[mapped].pg),
+ page_count(pga[mapped].pg),
mapped, page_count - 1);
local[mapped].offset = pga[mapped].off;
struct brw_page *pga, brw_callback_t callback,
struct io_cb_data *data)
{
- if (cmd & OBD_BRW_WRITE)
- return osc_brw_write(conn, md, page_count, pga, callback, data);
- else
- return osc_brw_read(conn, md, page_count, pga, callback, data);
+ ENTRY;
+
+ while (page_count) {
+ obd_count pages_per_brw;
+ int rc;
+
+ if (page_count > PTL_MD_MAX_IOV)
+ pages_per_brw = PTL_MD_MAX_IOV;
+ else
+ pages_per_brw = page_count;
+
+ if (cmd & OBD_BRW_WRITE)
+ rc = osc_brw_write(conn, md, pages_per_brw, pga,
+ callback, data);
+ else
+ rc = osc_brw_read(conn, md, pages_per_brw, pga,
+ callback, data);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ page_count -= pages_per_brw;
+ pga += pages_per_brw;
+ }
+ RETURN(0);
}
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
struct obd_device *obddev = class_conn2obd(connh);
struct ldlm_extent *extent = extentp;
int rc;
- __u32 mode2;
+ ENTRY;
- /* Filesystem locks are given a bit of special treatment: first we
+ /* Filesystem locks are given a bit of special treatment: if
+ * this is not a file size lock (which has end == -1), we
* fixup the lock to start and end on page boundaries. */
- extent->start &= PAGE_MASK;
- extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ if (extent->end != OBD_OBJECT_EOF) {
+ extent->start &= PAGE_MASK;
+ extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+ }
/* Next, search for already existing extent locks that will cover us */
- //osc_con2dlmcl(conn, &cl, &connection, &rconn);
rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
sizeof(extent), mode, lockh);
- if (rc == 1) {
+ if (rc == 1)
/* We already have a lock, and it's referenced */
- return 0;
- }
-
- /* Next, search for locks that we can upgrade (if we're trying to write)
- * or are more than we need (if we're trying to read). Because the VFS
- * and page cache already protect us locally, lots of readers/writers
- * can share a single PW lock. */
- if (mode == LCK_PW)
- mode2 = LCK_PR;
- else
- mode2 = LCK_PW;
+ RETURN(ELDLM_OK);
- rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
- sizeof(extent), mode2, lockh);
- if (rc == 1) {
- int flags;
- /* FIXME: This is not incredibly elegant, but it might
- * be more elegant than adding another parameter to
- * lock_match. I want a second opinion. */
- ldlm_lock_addref(lockh, mode);
- ldlm_lock_decref(lockh, mode2);
-
- if (mode == LCK_PR)
- return 0;
-
- rc = ldlm_cli_convert(lockh, mode, &flags);
- if (rc)
- LBUG();
-
- return rc;
+ /* If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too. */
+
+ if (mode == LCK_PR) {
+ rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
+ extent, sizeof(extent), LCK_PW, lockh);
+ if (rc == 1) {
+ /* FIXME: This is not incredibly elegant, but it might
+ * be more elegant than adding another parameter to
+ * lock_match. I want a second opinion. */
+ ldlm_lock_addref(lockh, LCK_PR);
+ ldlm_lock_decref(lockh, LCK_PW);
+
+ RETURN(ELDLM_OK);
+ }
}
- rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
- parent_lock, res_id, type, extent,
- sizeof(extent), mode, flags, ldlm_completion_ast,
- callback, data, datalen, lockh);
- return rc;
+ rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
+ res_id, type, extent, sizeof(extent), mode, flags,
+ ldlm_completion_ast, callback, data, datalen,
+ lockh);
+ RETURN(rc);
}
static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
}
static int osc_cancel_unused(struct lustre_handle *connh,
- struct lov_stripe_md *lsm)
+ struct lov_stripe_md *lsm, int flags)
{
struct obd_device *obddev = class_conn2obd(connh);
__u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
- return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id);
+ return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
}
static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
EXIT;
out:
- ptlrpc_free_req(request);
+ ptlrpc_req_finished(request);
return rc;
}
int err = 0;
ENTRY;
- if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
- _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
- CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
- _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
- RETURN(-EINVAL);
- }
-
switch (cmd) {
case IOC_LDLM_TEST: {
err = ldlm_test(obddev, conn);
CERROR("-- done err %d\n", err);
GOTO(out, err);
}
+ case IOC_OSC_REGISTER_LOV: {
+ if (obddev->u.cli.cl_containing_lov)
+ GOTO(out, err = -EALREADY);
+ obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
+ GOTO(out, err);
+ }
+
default:
- GOTO(out, err = -EINVAL);
+ GOTO(out, err = -ENOTTY);
}
out:
return err;
}
+int osc_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ int rc;
+ rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return rc;
+}
+
+int osc_detach(struct obd_device *dev)
+{
+ int rc;
+ rc = lprocfs_dereg_obd(dev);
+ return rc;
+
+}
struct obd_ops osc_obd_ops = {
+ o_attach: osc_attach,
+ o_detach: osc_detach,
o_setup: client_obd_setup,
o_cleanup: client_obd_cleanup,
o_statfs: osc_statfs,
static int __init osc_init(void)
{
- return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
+ int rc;
+
+ rc = class_register_type(&osc_obd_ops,
+ (lprocfs_vars_t*)status_class_var,
+ LUSTRE_OSC_NAME);
+ if (rc)
+ RETURN(rc);
+ return 0;
+
}
static void __exit osc_exit(void)