*
* Copyright (C) 2002 Cluster File Systems, Inc.
* Author: Phil Schwan <phil@off.net>
+ * Peter Braam <braam@clusterfs.com>
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
extern struct obd_device obd_dev[MAX_OBD_DEVICES];
/* obd methods */
-
static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
{
struct ptlrpc_request *req;
struct lov_obd *lov = &obd->u.lov;
struct lustre_handle mdc_conn;
uuid_t *uuidarray;
- int rc;
+ int rc, rc2;
int i;
MOD_INC_USE_COUNT;
RETURN(rc);
}
+ /* retrieve LOV metadata from MDS */
rc = obd_connect(&mdc_conn, lov->mdcobd);
if (rc) {
CERROR("cannot connect to mdc: rc = %d\n", rc);
}
rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
- obd_disconnect(&mdc_conn);
-
- if (rc) {
- CERROR("cannot get lov info %d\n", rc);
- GOTO(out, rc);
- }
-
-
- if (lov->desc.ld_tgt_count > 1000) {
- CERROR("configuration error: target count > 1000 (%d)\n",
- lov->desc.ld_tgt_count);
- GOTO(out, rc = -EINVAL);
+ rc2 = obd_disconnect(&mdc_conn);
+ if (rc || rc2) {
+ CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2);
+ GOTO(out, (rc) ? rc : rc2 );
}
+ /* sanity... */
if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
CERROR("lov uuid %s not on mds device (%s)\n",
obd->obd_uuid, lov->desc.ld_uuid);
GOTO(out, rc = -EINVAL);
}
-
+ if (lov->desc.ld_tgt_count > 1000) {
+ CERROR("configuration error: target count > 1000 (%d)\n",
+ lov->desc.ld_tgt_count);
+ GOTO(out, rc = -EINVAL);
+ }
if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
sizeof(uuid_t) * lov->desc.ld_tgt_count) {
CERROR("invalid uuid array returned\n");
for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
if (!tgt) {
- CERROR("Target %s not configured\n", uuidarray[i]);
+ CERROR("Target %s not attached\n", uuidarray[i]);
GOTO(out_mem, rc = -EINVAL);
}
+ if (!(tgt->obd_flags & OBD_SET_UP)) {
+ CERROR("Target %s not set up\n", uuidarray[i]);
+ GOTO(out_mem, rc = -EINVAL);
+ }
rc = obd_connect(&lov->tgts[i].conn, tgt);
if (rc) {
CERROR("Target %s connect error %d\n",
out_mem:
if (rc) {
for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
- int rc2;
rc2 = obd_disconnect(&lov->tgts[i].conn);
if (rc2)
CERROR("BAD: Target %s disconnect error %d\n",
out:
if (rc)
class_disconnect(conn);
-
ptlrpc_free_req(req);
return rc;
}
lov->tgts = NULL;
out_local:
-
rc = class_disconnect(conn);
if (!rc)
MOD_DEC_USE_COUNT;
-
return rc;
}
RETURN(-EINVAL);
}
- /* FIXME: we should make a connection instead perhaps to avoid
- the mdc from walking away? The fs guarantees this. */
lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
if (!lov->mdcobd) {
CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
return size;
}
+/* the LOV counts on oa->o_id to be set as the LOV object id */
static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
{
int rc = 0, i;
CERROR("lov_create needs EA for striping information\n");
RETURN(-EINVAL);
}
-
if (!export)
RETURN(-EINVAL);
lov = &export->exp_obd->u.lov;
}
md = *ea;
- md->lmd_size = oa->o_easize;
+ md->lmd_easize = oa->o_easize;
md->lmd_object_id = oa->o_id;
if (!md->lmd_stripe_count) {
- md->lmd_stripe_count = lov->desc.ld_default_stripecount;
+ md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
}
for (i = 0; i < md->lmd_stripe_count; i++) {
}
static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
-struct lov_stripe_md *ea)
+ struct lov_stripe_md *md)
{
int rc = 0, i;
struct obdo tmp;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
- struct lov_stripe_md *md;
ENTRY;
- if (!ea) {
+ if (!md) {
CERROR("LOV requires striping ea for desctruction\n");
RETURN(-EINVAL);
}
RETURN(-ENODEV);
lov = &export->exp_obd->u.lov;
- md = ea;
-
for (i = 0; i < md->lmd_stripe_count; i++) {
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
RETURN(rc);
}
-#if 0
-static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
- rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
+ rc = obd_getattr(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ /* XXX can do something more sophisticated here... */
+ if (i == 0 ) {
+ obd_id id = oa->o_id;
+ memcpy(oa, &tmp, sizeof(tmp));
+ oa->o_id = id;
+ }
+ }
RETURN(rc);
}
-static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error setattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
RETURN(rc);
}
-static int lov_open(struct lustre_handle *conn, struct obdo *oa)
+static int lov_open(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
RETURN(rc);
}
-static int lov_close(struct lustre_handle *conn, struct obdo *oa)
+
+static int lov_close(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
- RETURN(rc);
-}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+ rc = obd_close(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
+ RETURN(rc);
+}
-/* FIXME: maybe we'll just make one node the authoritative attribute node, then
- * we can send this 'punch' to just the authoritative node and the nodes
- * that the punch will affect. */
-static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
- obd_size count, obd_off offset)
+/* compute offset in stripe i corresponds to offset "in" */
+__u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
{
- int rc, retval, i;
- ENTRY;
-
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ __u32 ssz = md->lmd_stripe_size;
+ /* full stripes across all * stripe size */
+ __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz;
+ __u32 off = (__u32)in % (md->lmd_stripe_count * ssz);
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
- offset);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if ( in == 0xffffffffffffffff ) {
+ return 0xffffffffffffffff;
}
- RETURN(rc);
+ if ( (i+1) * ssz < off )
+ out += ssz;
+ else if ( i * ssz > off )
+ out += 0;
+ else
+ out += (off - (i * ssz)) % ssz;
+
+ return (__u64) out;
}
+
struct lov_callback_data {
atomic_t count;
- wait_queue_head waitq;
+ wait_queue_head_t waitq;
};
static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGTERM) ||
sigismember(&(current->pending.signal), SIGINT)) {
- cb_data->flags |= PTL_RPC_FL_INTR;
+ // FIXME XXX what here
+ // cb_data->flags |= PTL_RPC_FL_INTR;
RETURN(1);
}
if (atomic_read(&cb_data->count) == 0)
RETURN(0);
}
-/* buffer must lie in user memory here */
+
+/* FIXME: maybe we'll just make one node the authoritative attribute node, then
+ * we can send this 'punch' to just the authoritative node and the nodes
+ * that the punch will affect. */
+static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md,
+ obd_off start, obd_off end)
+{
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ ENTRY;
+
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ __u64 starti = lov_offset(md, start, i);
+ __u64 endi = lov_offset(md, end, i);
+
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL,
+ starti, endi);
+ if (!rc) {
+ CERROR("Error punch object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
+ RETURN(rc);
+}
+
+
+#if 0
static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
struct obdo **oa,
obd_count *oa_bufs, struct page **buf,
o_disconnect: lov_disconnect,
o_create: lov_create,
o_destroy: lov_destroy,
-#if 0
o_getattr: lov_getattr,
o_setattr: lov_setattr,
o_open: lov_open,
o_close: lov_close,
+#if 0
o_brw: lov_pgcache_brw,
o_punch: lov_punch,
o_enqueue: lov_enqueue,