/* obd methods */
-static int lov_getinfo(struct obd_device *obd,
- struct lov_desc *desc,
- uuid_t **uuids,
- struct ptlrpc_request **request)
-{
- struct ptlrpc_request *req;
- struct mds_status_req *streq;
- struct lov_obd *lov = &obd->u.lov;
- struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
- int rc, size[2] = {sizeof(*streq)};
- ENTRY;
-
- req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
- MDS_LOVINFO, 1, size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- *request = req;
- streq = lustre_msg_buf(req->rq_reqmsg, 0);
- streq->flags = HTON__u32(MDS_STATUS_LOV);
- streq->repbuf = HTON__u32(8000);
-
- /* prepare for reply */
- req->rq_level = LUSTRE_CONN_CON;
- size[0] = sizeof(*desc);
- size[1] = 8000;
- req->rq_replen = lustre_msg_size(2, size);
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
-
- if (!rc) {
- memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
- *uuids = lustre_msg_buf(req->rq_repmsg, 1);
- lov_unpackdesc(desc);
- }
- EXIT;
- out:
- return rc;
-}
-
static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
{
- int rc;
- int i;
struct ptlrpc_request *req;
struct lov_obd *lov = &obd->u.lov;
- uuid_t *uuidarray;
+ struct lustre_handle mdc_conn;
+ uuid_t *uuidarray;
+ int rc;
+ int i;
MOD_INC_USE_COUNT;
rc = class_connect(conn, obd);
- if (rc) {
+ if (rc) {
MOD_DEC_USE_COUNT;
- RETURN(rc);
+ RETURN(rc);
}
-
- rc = lov_getinfo(obd, &lov->desc, &uuidarray, &req);
- if (rc) {
+
+ rc = obd_connect(&mdc_conn, lov->mdcobd);
+ if (rc) {
+ CERROR("cannot connect to mdc: rc = %d\n", rc);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
+ obd_disconnect(&mdc_conn);
+
+ if (rc) {
CERROR("cannot get lov info %d\n", rc);
- GOTO(out, rc);
+ GOTO(out, rc);
}
-
- if (lov->desc.ld_tgt_count > 1000) {
+
+
+ if (lov->desc.ld_tgt_count > 1000) {
CERROR("configuration error: target count > 1000 (%d)\n",
lov->desc.ld_tgt_count);
- GOTO(out, rc = -EINVAL);
+ GOTO(out, rc = -EINVAL);
}
-
- if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
- CERROR("lov uuid %s not on mds device (%s)\n",
+
+ if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
+ CERROR("lov uuid %s not on mds device (%s)\n",
obd->obd_uuid, lov->desc.ld_uuid);
- GOTO(out, rc = -EINVAL);
- }
-
- if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
- sizeof(uuid_t) * lov->desc.ld_tgt_count) {
- CERROR("invalid uuid array returned\n");
- GOTO(out, rc = -EINVAL);
+ GOTO(out, rc = -EINVAL);
}
- lov->bufsize = sizeof(struct lov_tgt_desc) * lov->desc.ld_tgt_count;
- OBD_ALLOC(lov->tgts, lov->bufsize);
- if (!lov->tgts) {
- CERROR("Out of memory\n");
- GOTO(out, rc = -ENOMEM);
+ if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
+ sizeof(uuid_t) * lov->desc.ld_tgt_count) {
+ CERROR("invalid uuid array returned\n");
+ GOTO(out, rc = -EINVAL);
}
- uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
- for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
- memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
+ lov->bufsize = sizeof(struct lov_tgt_desc) * lov->desc.ld_tgt_count;
+ OBD_ALLOC(lov->tgts, lov->bufsize);
+ if (!lov->tgts) {
+ CERROR("Out of memory\n");
+ GOTO(out, rc = -ENOMEM);
}
- for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
+ uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
+ for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
+ memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
+
+ for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
- if (!tgt) {
- CERROR("Target %s not configured\n", uuidarray[i]);
- GOTO(out_mem, rc = -EINVAL);
+ if (!tgt) {
+ CERROR("Target %s not configured\n", uuidarray[i]);
+ GOTO(out_mem, rc = -EINVAL);
}
- rc = obd_connect(&lov->tgts[i].conn, tgt);
- if (rc) {
- CERROR("Target %s connect error %d\n",
- uuidarray[i], rc);
+ rc = obd_connect(&lov->tgts[i].conn, tgt);
+ if (rc) {
+ CERROR("Target %s connect error %d\n",
+ uuidarray[i], rc);
GOTO(out_mem, rc);
}
}
out_mem:
- if (rc) {
- for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
- rc = obd_disconnect(&lov->tgts[i].conn);
- if (rc)
- CERROR("Target %s disconnect error %d\n",
- uuidarray[i], rc);
+ if (rc) {
+ for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
+ int rc2;
+ rc2 = obd_disconnect(&lov->tgts[i].conn);
+ if (rc2)
+ CERROR("BAD: Target %s disconnect error %d\n",
+ uuidarray[i], rc2);
}
OBD_FREE(lov->tgts, lov->bufsize);
}
out:
- if (rc) {
+ if (rc)
class_disconnect(conn);
- }
- if (req)
- ptlrpc_free_req(req);
+
+ ptlrpc_free_req(req);
return rc;
-
}
static int lov_disconnect(struct lustre_handle *conn)
for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
rc = obd_disconnect(&lov->tgts[i].conn);
- if (rc)
+ if (rc) {
CERROR("Target %s disconnect error %d\n",
lov->tgts[i].uuid, rc);
+ RETURN(rc);
+ }
}
OBD_FREE(lov->tgts, lov->bufsize);
lov->bufsize = 0;
RETURN(rc);
}
+
+static inline int lov_stripe_md_size(struct obd_device *obd)
+{
+ struct lov_obd *lov = &obd->u.lov;
+ int size;
+
+ size = sizeof(struct lov_stripe_md) +
+ lov->desc.ld_tgt_count * sizeof(struct lov_object_id);
+ return size;
+}
+
+static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
+{
+ int rc, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ struct lov_stripe_md *md;
+ ENTRY;
+
+ if (!ea) {
+ CERROR("lov_create needs EA for striping information\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export)
+ RETURN(-EINVAL);
+ lov = &export->exp_obd->u.lov;
+
+ oa->o_easize = lov_stripe_md_size(export->exp_obd);
+ if (!*ea) {
+ OBD_ALLOC(*ea, oa->o_easize);
+ if (! *ea)
+ RETURN(-ENOMEM);
+ }
+
+ md = *ea;
+ md->lmd_size = oa->o_easize;
+ md->lmd_object_id = oa->o_id;
+ if (!md->lmd_stripe_count) {
+ md->lmd_stripe_count = lov->desc.ld_default_stripecount;
+ }
+
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ struct lov_stripe_md obj_md;
+ struct lov_stripe_md *obj_mdp = &obj_md;
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ tmp.o_easize = sizeof(struct lov_stripe_md);
+ rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
+ if (rc)
+ GOTO(out_cleanup, rc);
+ md->lmd_objects[i].l_object_id = tmp.o_id;
+ }
+
+ out_cleanup:
+ if (rc) {
+ int i2, rc2;
+ for (i2 = 0 ; i2 < i ; i2++) {
+ /* destroy already created objects here */
+ tmp.o_id = md->lmd_objects[i].l_object_id;
+ rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
+ if (rc2) {
+ CERROR("Failed to remove object from target %d\n",
+ i2);
+ }
+ }
+ }
+ return rc;
+}
+
+static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
+struct lov_stripe_md *ea)
+{
+ int rc, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ struct lov_stripe_md *md;
+ ENTRY;
+
+ if (!ea) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ md = ea;
+
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+ rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error destroying object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
+ RETURN(rc);
+}
+
#if 0
static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
{
RETURN(rc);
}
-static int lov_create(struct lustre_handle *conn, struct obdo *oa)
-{
- int rc, retval, i, offset;
- struct obdo tmp;
- struct lov_md md;
- ENTRY;
-
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- md.lmd_object_id = oa->o_id;
- md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
-
- memset(oa->o_inline, 0, sizeof(oa->o_inline));
- offset = sizeof(md);
- for (i = 0; i < md.lmd_stripe_count; i++) {
- struct lov_object_id lov_id;
- rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
- if (i == 0) {
- memcpy(oa, &tmp, sizeof(tmp));
- retval = rc;
- } else if (retval != rc)
- CERROR("return codes didn't match (%d, %d)\n",
- retval, rc);
- lov_id = (struct lov_object_id *)(oa->o_inline + offset);
- lov_id->l_device_id = i;
- lov_id->l_object_id = tmp.o_id;
- offset += sizeof(*lov_id);
- }
- memcpy(oa->o_inline, &md, sizeof(md));
-
- return rc;
-}
-
-static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
-{
- int rc, retval, i, offset;
- struct obdo tmp;
- struct lov_md *md;
- struct lov_object_id lov_id;
- ENTRY;
-
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- md = (struct lov_md *)oa->o_inline;
- memcpy(&tmp, oa, sizeof(tmp));
-
- offset = sizeof(md);
- for (i = 0; i < md->lmd_stripe_count; i++) {
- struct lov_object_id *lov_id;
- lov_id = (struct lov_object_id *)(oa->o_inline + offset);
-
- tmp.o_id = lov_id->l_object_id;
-
- rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("return codes didn't match (%d, %d)\n",
- retval, rc);
- offset += sizeof(*lov_id);
- }
-
- return rc;
-}
/* FIXME: maybe we'll just make one node the authoritative attribute node, then
* we can send this 'punch' to just the authoritative node and the nodes
buf += size;
}
- wait_event_interruptible(&cb_data->waitq,
- lov_read_check_status(cb_data));
+ wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
if (cb_data->flags & PTL_RPC_FL_INTR)
rc = -EINTR;
o_setup: lov_setup,
o_connect: lov_connect,
o_disconnect: lov_disconnect,
-#if 0
o_create: lov_create,
+ o_destroy: lov_destroy,
+#if 0
o_getattr: lov_getattr,
o_setattr: lov_setattr,
o_open: lov_open,
o_close: lov_close,
- o_destroy: lov_destroy,
o_brw: lov_pgcache_brw,
o_punch: lov_punch,
o_enqueue: lov_enqueue,