for (i = 0; i < desc->ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+ int rc2;
if (!tgt) {
CERROR("Target %s not attached\n", uuidarray[i]);
rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
recover);
- if (rc) {
- CERROR("Target %s connect error %d\n",
- uuidarray[i], rc);
- GOTO(out_disc, rc);
+
+ /* Register even if connect failed, so that we get reactivation
+ * notices.
+ */
+ rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
+ sizeof(struct obd_device *), obd, NULL);
+ if (rc2) {
+ CERROR("Target %s REGISTER_LOV error %d\n",
+ uuidarray[i], rc2);
+ GOTO(out_disc, rc2);
}
- rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
- sizeof(struct obd_device *), obd, NULL);
+
+ /* But mark failed-connect OSCs as inactive! */
if (rc) {
- CERROR("Target %s REGISTER_LOV error %d\n",
+ CDEBUG(D_INFO, "Target %s connect error %d\n",
uuidarray[i], rc);
- GOTO(out_disc, rc);
+ LASSERT(lov->tgts[i].active == 0);
+ rc = 0;
+ continue;
}
+
desc->ld_active_tgt_count++;
lov->tgts[i].active = 1;
}
goto out_local;
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
- if (!lov->tgts[i].active) {
- CERROR("Skipping disconnect for inactive OSC %s\n",
- lov->tgts[i].uuid);
- continue;
- }
-
- lov->desc.ld_active_tgt_count--;
- lov->tgts[i].active = 0;
rc = obd_disconnect(&lov->tgts[i].conn);
if (rc) {
- CERROR("Target %s disconnect error %d\n",
- lov->tgts[i].uuid, rc);
- RETURN(rc);
+ if (lov->tgts[i].active) {
+ CERROR("Target %s disconnect error %d\n",
+ lov->tgts[i].uuid, rc);
+ }
+ rc = 0;
+ }
+ if (lov->tgts[i].active) {
+ lov->desc.ld_active_tgt_count--;
+ lov->tgts[i].active = 0;
}
}
OBD_FREE(lov->tgts, lov->bufsize);
CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
lov->tgts[i].active = activate;
- if (activate)
+ if (activate) {
+ /*
+ * foreach(export)
+ * foreach(open_file)
+ * if (file_handle uses this_osc)
+ * if (has_no_filehandle)
+ * open(file_handle, this_osc);
+ */
+ /* XXX reconnect? */
lov->desc.ld_active_tgt_count++;
- else
+ } else {
+ /*
+ * Should I invalidate filehandles that refer to this OSC, so
+ * that I reopen them during reactivation?
+ */
+ /* XXX disconnect from OSC? */
lov->desc.ld_active_tgt_count--;
+ }
EXIT;
out:
ENTRY;
if (data->ioc_inllen1 < 1) {
- CERROR("osc setup requires an MDC UUID\n");
+ CERROR("LOV setup requires an MDC UUID\n");
RETURN(-EINVAL);
}
lsm = *ea;
+ /* Can't create more stripes than we have targets (incl inactive). */
+ if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
+ GOTO(out_tmp, rc = -EINVAL);
+
/* Free the user lsm if it needs to be changed, to avoid memory leaks */
if (!lsm || (lsm &&
lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
out_tmp:
obdo_free(tmp);
- return rc;
+ RETURN(rc);
out_cleanup:
while (i-- > 0) {
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+ int err;
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ /* Orphan clean up will (someday) fix this up. */
+ continue;
+ }
+
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
sizeof(lfh->lfh_handles[i]));
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
- rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
- if (rc)
- CERROR("Error destroying objid "LPX64" subobj "LPX64
- " on OST idx %d\n: rc = %d",
- oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
+ err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
+ NULL);
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("Error destroying objid "LPX64" subobj "
+ LPX64" on OST idx %d\n: rc = %d",
+ oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ if (!rc)
+ rc = err;
+ }
}
RETURN(rc);
}
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_file_handles *lfh = NULL;
- int rc = 0, i;
+ int i;
int new = 1;
ENTRY;
if (loi->loi_id == 0)
continue;
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
+
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
"%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
/* create data objects with "parent" OA */
tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
- if (err) {
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
CERROR("Error getattr objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
- if (!rc)
- rc = err;
- continue; /* XXX or break? */
+ RETURN(err);
}
lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
}
- RETURN(rc);
+
+ RETURN(0);
}
static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
static int lov_open(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm)
{
- struct obdo *tmp;
+ struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
struct lov_file_handles *lfh = NULL;
+ struct lustre_handle *handle;
int new = 1;
int rc = 0, i;
ENTRY;
oa->o_size = 0;
oa->o_blocks = 0;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
- int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0) {
+ continue;
+ }
/* create data objects with "parent" OA */
memcpy(tmp, oa, sizeof(*tmp));
tmp->o_id = loi->loi_id;
- err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
- if (err) {
+ rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+ if (rc && lov->tgts[loi->loi_ost_idx].active) {
CERROR("Error open objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
oa->o_id, lsm->lsm_oinfo[i].loi_id,
loi->loi_ost_idx, rc);
- if (!rc)
- rc = err;
+ goto out_handles;
}
lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
sizeof(lfh->lfh_handles[i]));
}
- if (tmp->o_valid & OBD_MD_FLHANDLE) {
- struct lustre_handle *handle = obdo_handle(oa);
+ handle = obdo_handle(oa);
+
+ lfh->lfh_count = lsm->lsm_stripe_count;
+ get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+
+ handle->addr = (__u64)(unsigned long)lfh;
+ handle->cookie = lfh->lfh_cookie;
+ oa->o_valid |= OBD_MD_FLHANDLE;
+ list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
- lfh->lfh_count = lsm->lsm_stripe_count;
- get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
-
- handle->addr = (__u64)(unsigned long)lfh;
- handle->cookie = lfh->lfh_cookie;
- oa->o_valid |= OBD_MD_FLHANDLE;
- list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
- } else
- goto out_handles;
-
- /* FIXME: returning an error, but having opened some objects is a bad
- * idea, since they will likely never be closed. We either
- * need to not return an error if _some_ objects could be
- * opened, and leave it to read/write to return -EIO (with
- * hopefully partial error status) or close all opened objects
- * and return an error. I think the former is preferred.
- */
out_tmp:
obdo_free(tmp);
RETURN(rc);
out_handles:
+ for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
+ int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
+
+ memcpy(tmp, oa, sizeof(*tmp));
+ tmp->o_id = loi->loi_id;
+ memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+ sizeof(lfh->lfh_handles[i]));
+
+ err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+ if (err) {
+ CERROR("Error closing objid "LPX64" subobj "LPX64
+ " on OST idx %d after open error: rc = %d\n",
+ oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ }
+ }
+
OBD_FREE(lfh->lfh_handles,
lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
out_lfh:
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
int err;
+
+ if (lov->tgts[loi->loi_ost_idx].active == 0)
+ continue;
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
RETURN(-EINVAL);
}
+ /* XXX assert that we're not in recovery */
+
if (!export || !export->exp_obd)
RETURN(-ENODEV);
struct ldlm_extent sub_ext;
struct lov_stripe_md submd;
+ *flags = 0;
sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
if (sub_ext.start == sub_ext.end)