Whamcloud - gitweb
Landing of b_recovery (at last).
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index c840368..15fe873 100644 (file)
@@ -158,6 +158,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         for (i = 0; i < desc->ld_tgt_count; i++) {
                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+                int rc2;
 
                 if (!tgt) {
                         CERROR("Target %s not attached\n", uuidarray[i]);
@@ -171,18 +172,27 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
                                  recover);
-                if (rc) {
-                        CERROR("Target %s connect error %d\n",
-                               uuidarray[i], rc);
-                        GOTO(out_disc, rc);
+
+                /* Register even if connect failed, so that we get reactivation
+                 * notices.
+                 */
+                rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
+                                    sizeof(struct obd_device *), obd, NULL);
+                if (rc2) {
+                        CERROR("Target %s REGISTER_LOV error %d\n",
+                               uuidarray[i], rc2);
+                        GOTO(out_disc, rc2);
                 }
-                rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
-                                   sizeof(struct obd_device *), obd, NULL);
+
+                /* But mark failed-connect OSCs as inactive! */
                 if (rc) {
-                        CERROR("Target %s REGISTER_LOV error %d\n",
+                        CDEBUG(D_INFO, "Target %s connect error %d\n",
                                uuidarray[i], rc);
-                        GOTO(out_disc, rc);
+                        LASSERT(lov->tgts[i].active == 0);
+                        rc = 0;
+                        continue;
                 }
+                
                 desc->ld_active_tgt_count++;
                 lov->tgts[i].active = 1;
         }
@@ -227,19 +237,17 @@ static int lov_disconnect(struct lustre_handle *conn)
                 goto out_local;
 
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->tgts[i].active) {
-                        CERROR("Skipping disconnect for inactive OSC %s\n",
-                               lov->tgts[i].uuid);
-                        continue;
-                }
-
-                lov->desc.ld_active_tgt_count--;
-                lov->tgts[i].active = 0;
                 rc = obd_disconnect(&lov->tgts[i].conn);
                 if (rc) {
-                        CERROR("Target %s disconnect error %d\n",
-                               lov->tgts[i].uuid, rc);
-                        RETURN(rc);
+                        if (lov->tgts[i].active) {
+                                CERROR("Target %s disconnect error %d\n",
+                                       lov->tgts[i].uuid, rc);
+                        }
+                        rc = 0;
+                }
+                if (lov->tgts[i].active) {
+                        lov->desc.ld_active_tgt_count--;
+                        lov->tgts[i].active = 0;
                 }
         }
         OBD_FREE(lov->tgts, lov->bufsize);
@@ -313,10 +321,24 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
 
         lov->tgts[i].active = activate;
-        if (activate)
+        if (activate) {
+                /*
+                 * foreach(export)
+                 *     foreach(open_file)
+                 *         if (file_handle uses this_osc)
+                 *             if (has_no_filehandle)
+                 *                 open(file_handle, this_osc);
+                 */
+                /* XXX reconnect? */
                 lov->desc.ld_active_tgt_count++;
-        else
+        } else {
+                /*
+                 * Should I invalidate filehandles that refer to this OSC, so
+                 * that I reopen them during reactivation?
+                 */
+                /* XXX disconnect from OSC? */
                 lov->desc.ld_active_tgt_count--;
+        }
 
         EXIT;
  out:
@@ -332,7 +354,7 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         ENTRY;
 
         if (data->ioc_inllen1 < 1) {
-                CERROR("osc setup requires an MDC UUID\n");
+                CERROR("LOV setup requires an MDC UUID\n");
                 RETURN(-EINVAL);
         }
 
@@ -400,6 +422,10 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
 
         lsm = *ea;
 
+        /* Can't create more stripes than we have targets (incl inactive). */
+        if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
+                GOTO(out_tmp, rc = -EINVAL);
+
         /* Free the user lsm if it needs to be changed, to avoid memory leaks */
         if (!lsm || (lsm &&
                      lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
@@ -494,7 +520,7 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
 
  out_tmp:
         obdo_free(tmp);
-        return rc;
+        RETURN(rc);
 
  out_cleanup:
         while (i-- > 0) {
@@ -547,6 +573,12 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+                int err;
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        /* Orphan clean up will (someday) fix this up. */
+                        continue;
+                }
+
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
@@ -554,11 +586,15 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
                                sizeof(lfh->lfh_handles[i]));
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
-                rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
-                if (rc)
-                        CERROR("Error destroying objid "LPX64" subobj "LPX64
-                               " on OST idx %d\n: rc = %d",
-                               oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
+                err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
+                                  NULL);
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
+                        CERROR("Error destroying objid "LPX64" subobj "
+                               LPX64" on OST idx %d\n: rc = %d",
+                               oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+                        if (!rc)
+                                rc = err;
+                }
         }
         RETURN(rc);
 }
@@ -620,7 +656,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_file_handles *lfh = NULL;
-        int rc = 0, i;
+        int i;
         int new = 1;
         ENTRY;
 
@@ -649,6 +685,9 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 if (loi->loi_id == 0)
                         continue;
 
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
+
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
                 /* create data objects with "parent" OA */
@@ -661,17 +700,16 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
-                if (err) {
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("Error getattr objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
-                        if (!rc)
-                                rc = err;
-                        continue; /* XXX or break? */
+                        RETURN(err);
                 }
                 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
         }
-        RETURN(rc);
+
+        RETURN(0);
 }
 
 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
@@ -744,11 +782,12 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                     struct lov_stripe_md *lsm)
 {
-        struct obdo *tmp;
+        struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_file_handles *lfh = NULL;
+        struct lustre_handle *handle;
         int new = 1;
         int rc = 0, i;
         ENTRY;
@@ -783,20 +822,22 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         oa->o_size = 0;
         oa->o_blocks = 0;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                int err;
+
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        continue;
+                }
 
                 /* create data objects with "parent" OA */
                 memcpy(tmp, oa, sizeof(*tmp));
                 tmp->o_id = loi->loi_id;
 
-                err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
-                if (err) {
+                rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+                if (rc && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("Error open objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
                                loi->loi_ost_idx, rc);
-                        if (!rc)
-                                rc = err;
+                        goto out_handles;
                 }
 
                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
@@ -806,31 +847,40 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                                sizeof(lfh->lfh_handles[i]));
         }
 
-        if (tmp->o_valid & OBD_MD_FLHANDLE) {
-                struct lustre_handle *handle = obdo_handle(oa);
+        handle = obdo_handle(oa);
+        
+        lfh->lfh_count = lsm->lsm_stripe_count;
+        get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+        
+        handle->addr = (__u64)(unsigned long)lfh;
+        handle->cookie = lfh->lfh_cookie;
+        oa->o_valid |= OBD_MD_FLHANDLE;
+        list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
 
-                lfh->lfh_count = lsm->lsm_stripe_count;
-                get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
-
-                handle->addr = (__u64)(unsigned long)lfh;
-                handle->cookie = lfh->lfh_cookie;
-                oa->o_valid |= OBD_MD_FLHANDLE;
-                list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
-        } else
-                goto out_handles;
-
-        /* FIXME: returning an error, but having opened some objects is a bad
-         *        idea, since they will likely never be closed.  We either
-         *        need to not return an error if _some_ objects could be
-         *        opened, and leave it to read/write to return -EIO (with
-         *        hopefully partial error status) or close all opened objects
-         *        and return an error.  I think the former is preferred.
-         */
 out_tmp:
         obdo_free(tmp);
         RETURN(rc);
 
 out_handles:
+        for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
+                int err;
+
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
+
+                memcpy(tmp, oa, sizeof(*tmp));
+                tmp->o_id = loi->loi_id;
+                memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+                       sizeof(lfh->lfh_handles[i]));
+
+                err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+                if (err) {
+                        CERROR("Error closing objid "LPX64" subobj "LPX64
+                               " on OST idx %d after open error: rc = %d\n",
+                               oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+                }
+        }
+       
         OBD_FREE(lfh->lfh_handles,
                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
 out_lfh:
@@ -870,6 +920,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
+                
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
 
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
@@ -1119,6 +1172,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 RETURN(-EINVAL);
         }
 
+        /* XXX assert that we're not in recovery */
+
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
@@ -1128,6 +1183,7 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 struct ldlm_extent sub_ext;
                 struct lov_stripe_md submd;
 
+                *flags = 0;
                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
                 if (sub_ext.start == sub_ext.end)