Whamcloud - gitweb
A mostly-fix for "mknod /mnt/lustre/foofo p". It doesn't fail outright
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index 3a0b3b8..c0fd524 100644 (file)
@@ -26,130 +26,97 @@ extern struct obd_device obd_dev[MAX_OBD_DEVICES];
 
 /* obd methods */
 
-static int lov_getinfo(struct obd_device *obd, 
-                       struct lov_desc *desc, 
-                       uuid_t **uuids, 
-                       struct ptlrpc_request **request)
-{
-        struct ptlrpc_request *req;
-        struct mds_status_req *streq;
-        struct lov_obd *lov = &obd->u.lov; 
-        struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
-        int rc, size[2] = {sizeof(*streq)};
-        ENTRY;
-
-        req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
-                               MDS_LOVINFO, 1, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-        
-        *request = req;
-        streq = lustre_msg_buf(req->rq_reqmsg, 0);
-        streq->flags = HTON__u32(MDS_STATUS_LOV);
-        streq->repbuf = HTON__u32(8000);
-        
-        /* prepare for reply */ 
-        req->rq_level = LUSTRE_CONN_CON;
-        size[0] = sizeof(*desc); 
-        size[1] = 8000; 
-        req->rq_replen = lustre_msg_size(2, size);
-        
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-
-        if (!rc) {
-                memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
-                *uuids = lustre_msg_buf(req->rq_repmsg, 1);
-                lov_unpackdesc(desc); 
-        }
-        EXIT;
- out:
-        return rc;
-}
-
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
 {
-        int rc;
-        int i;
         struct ptlrpc_request *req;
         struct lov_obd *lov = &obd->u.lov;
-        uuid_t *uuidarray; 
+        struct lustre_handle mdc_conn;
+        uuid_t *uuidarray;
+        int rc;
+        int i;
 
         MOD_INC_USE_COUNT;
         rc = class_connect(conn, obd);
-        if (rc) { 
+        if (rc) {
                 MOD_DEC_USE_COUNT;
-                RETURN(rc); 
+                RETURN(rc);
         }
-        
-        rc = lov_getinfo(obd, &lov->desc, &uuidarray, &req);
-        if (rc) { 
+
+        rc = obd_connect(&mdc_conn, lov->mdcobd);
+        if (rc) {
+                CERROR("cannot connect to mdc: rc = %d\n", rc);
+                GOTO(out, rc = -EINVAL);
+        }
+
+        rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
+        obd_disconnect(&mdc_conn);
+
+        if (rc) {
                 CERROR("cannot get lov info %d\n", rc);
-                GOTO(out, rc); 
+                GOTO(out, rc);
         }
-        
-        if (lov->desc.ld_tgt_count > 1000) { 
+
+
+        if (lov->desc.ld_tgt_count > 1000) {
                 CERROR("configuration error: target count > 1000 (%d)\n",
                        lov->desc.ld_tgt_count);
-                GOTO(out, rc = -EINVAL); 
+                GOTO(out, rc = -EINVAL);
         }
-        
-        if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) { 
-                CERROR("lov uuid %s not on mds device (%s)\n", 
+
+        if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
+                CERROR("lov uuid %s not on mds device (%s)\n",
                        obd->obd_uuid, lov->desc.ld_uuid);
-                GOTO(out, rc = -EINVAL); 
-        }                
-            
-        if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] < 
-            sizeof(uuid_t) * lov->desc.ld_tgt_count) { 
-                CERROR("invalid uuid array returned\n");
-                GOTO(out, rc = -EINVAL); 
+                GOTO(out, rc = -EINVAL);
         }
 
-        lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count; 
-        OBD_ALLOC(lov->tgts, lov->bufsize); 
-        if (!lov->tgts) { 
-                CERROR("Out of memory\n"); 
-                GOTO(out, rc = -ENOMEM); 
+        if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
+            sizeof(uuid_t) * lov->desc.ld_tgt_count) {
+                CERROR("invalid uuid array returned\n");
+                GOTO(out, rc = -EINVAL);
         }
 
-        uuidarray = lustre_msg_buf(req->rq_repmsg, 1); 
-        for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
-                memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t)); 
+        lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count;
+        OBD_ALLOC(lov->tgts, lov->bufsize);
+        if (!lov->tgts) {
+                CERROR("Out of memory\n");
+                GOTO(out, rc = -ENOMEM);
         }
 
-        for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
+        uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
+        for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
+                memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
+
+        for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
-                if (!tgt) { 
-                        CERROR("Target %s not configured\n", uuidarray[i]); 
-                        GOTO(out_mem, rc = -EINVAL); 
+                if (!tgt) {
+                        CERROR("Target %s not configured\n", uuidarray[i]);
+                        GOTO(out_mem, rc = -EINVAL);
                 }
-                rc = obd_connect(&lov->tgts[i].conn, tgt); 
-                if (rc) { 
-                        CERROR("Target %s connect error %d\n", 
-                               uuidarray[i], rc); 
+                rc = obd_connect(&lov->tgts[i].conn, tgt);
+                if (rc) {
+                        CERROR("Target %s connect error %d\n",
+                               uuidarray[i], rc);
                         GOTO(out_mem, rc);
                 }
         }
 
  out_mem:
-        if (rc) { 
-                for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
-                        rc = obd_disconnect(&lov->tgts[i].conn);
-                        if (rc)
-                                CERROR("Target %s disconnect error %d\n", 
-                                       uuidarray[i], rc); 
+        if (rc) {
+                for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
+                        int rc2;
+                        rc2 = obd_disconnect(&lov->tgts[i].conn);
+                        if (rc2)
+                                CERROR("BAD: Target %s disconnect error %d\n",
+                                       uuidarray[i], rc2);
                 }
                 OBD_FREE(lov->tgts, lov->bufsize);
         }
  out:
-        if (rc) { 
+        if (rc)
                 class_disconnect(conn);
-        }
-        if (req)
-                ptlrpc_free_req(req); 
+
+        ptlrpc_free_req(req);
         return rc;
-        
 }
 
 static int lov_disconnect(struct lustre_handle *conn)
@@ -164,9 +131,11 @@ static int lov_disconnect(struct lustre_handle *conn)
 
         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
                 rc = obd_disconnect(&lov->tgts[i].conn);
-                if (rc)
+                if (rc) { 
                         CERROR("Target %s disconnect error %d\n", 
                                lov->tgts[i].uuid, rc); 
+                        RETURN(rc);
+                }
         }
         OBD_FREE(lov->tgts, lov->bufsize);
         lov->bufsize = 0;
@@ -209,6 +178,111 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         RETURN(rc); 
 } 
 
+
+static inline int lov_stripe_md_size(struct obd_device *obd)
+{
+        struct lov_obd *lov = &obd->u.lov;
+        int size;
+
+        size = sizeof(struct lov_stripe_md) + 
+                lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
+        return size;
+}
+
+static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
+{
+        int rc, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
+        struct lov_stripe_md *md;
+        ENTRY;
+
+        if (!ea) { 
+                CERROR("lov_create needs EA for striping information\n"); 
+                RETURN(-EINVAL); 
+        }
+
+        if (!export)
+                RETURN(-EINVAL);
+        lov = &export->exp_obd->u.lov;
+
+        oa->o_easize =  lov_stripe_md_size(export->exp_obd);
+        if (!*ea) {
+                OBD_ALLOC(*ea, oa->o_easize);
+                if (! *ea)
+                        RETURN(-ENOMEM);
+        }
+
+        md = *ea;
+        md->lmd_size = oa->o_easize;
+        md->lmd_object_id = oa->o_id;
+        if (!md->lmd_stripe_count) { 
+                md->lmd_stripe_count = lov->desc.ld_default_stripecount;
+        }
+
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                struct lov_stripe_md obj_md; 
+                struct lov_stripe_md *obj_mdp = &obj_md; 
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                tmp.o_easize = sizeof(struct lov_stripe_md);
+                rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
+                if (rc) 
+                        GOTO(out_cleanup, rc); 
+                md->lmd_objects[i].l_object_id = tmp.o_id;
+        }
+
+ out_cleanup: 
+        if (rc) { 
+                int i2, rc2;
+                for (i2 = 0 ; i2 < i ; i2++) { 
+                        /* destroy already created objects here */ 
+                        tmp.o_id = md->lmd_objects[i].l_object_id;
+                        rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
+                        if (rc2) { 
+                                CERROR("Failed to remove object from target %d\n", 
+                                       i2); 
+                        }
+                }
+        }
+        return rc;
+}
+
+static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
+struct lov_stripe_md *ea)
+{
+        int rc, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
+        struct lov_stripe_md *md;
+        ENTRY;
+
+        if (!ea) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
+        }
+
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
+
+        lov = &export->exp_obd->u.lov;
+        md = ea;
+
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
+                rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
+                if (!rc) { 
+                        CERROR("Error destroying object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+        }
+        RETURN(rc);
+}
+
 #if 0
 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
 {
@@ -279,73 +353,7 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa)
         RETURN(rc);
 }
 
-static int lov_create(struct lustre_handle *conn, struct obdo *oa)
-{
-        int rc, retval, i, offset;
-        struct obdo tmp;
-        struct lov_md md;
-        ENTRY;
-
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
-
-        md.lmd_object_id = oa->o_id;
-        md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
-
-        memset(oa->o_inline, 0, sizeof(oa->o_inline));
-        offset = sizeof(md);
-        for (i = 0; i < md.lmd_stripe_count; i++) {
-                struct lov_object_id lov_id;
-                rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
-                if (i == 0) {
-                        memcpy(oa, &tmp, sizeof(tmp));
-                        retval = rc;
-                } else if (retval != rc)
-                        CERROR("return codes didn't match (%d, %d)\n",
-                               retval, rc);
-                lov_id = (struct lov_object_id *)(oa->o_inline + offset);
-                lov_id->l_device_id = i;
-                lov_id->l_object_id = tmp.o_id;
-                offset += sizeof(*lov_id);
-        }
-        memcpy(oa->o_inline, &md, sizeof(md));
-
-        return rc;
-}
-
-static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
-{
-        int rc, retval, i, offset;
-        struct obdo tmp;
-        struct lov_md *md;
-        struct lov_object_id lov_id;
-        ENTRY;
-
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
-
-        md = (struct lov_md *)oa->o_inline;
 
-        memcpy(&tmp, oa, sizeof(tmp));
-
-        offset = sizeof(md);
-        for (i = 0; i < md->lmd_stripe_count; i++) {
-                struct lov_object_id *lov_id;
-                lov_id = (struct lov_object_id *)(oa->o_inline + offset);
-
-                tmp.o_id = lov_id->l_object_id;
-
-                rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
-                if (i == 0)
-                        retval = rc;
-                else if (retval != rc)
-                        CERROR("return codes didn't match (%d, %d)\n",
-                               retval, rc);
-                offset += sizeof(*lov_id);
-        }
-
-        return rc;
-}
 
 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
  * we can send this 'punch' to just the authoritative node and the nodes
@@ -467,8 +475,7 @@ static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
                 buf += size;
         }
 
-        wait_event_interruptible(&cb_data->waitq,
-                                 lov_read_check_status(cb_data));
+        wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
         if (cb_data->flags & PTL_RPC_FL_INTR)
                 rc = -EINTR;
 
@@ -557,13 +564,13 @@ struct obd_ops lov_obd_ops = {
         o_setup:       lov_setup,
         o_connect:     lov_connect,
         o_disconnect:  lov_disconnect,
-#if 0
         o_create:      lov_create,
+        o_destroy:     lov_destroy,
+#if 0
         o_getattr:     lov_getattr,
         o_setattr:     lov_setattr,
         o_open:        lov_open,
         o_close:       lov_close,
-        o_destroy:     lov_destroy,
         o_brw:         lov_pgcache_brw,
         o_punch:       lov_punch,
         o_enqueue:     lov_enqueue,