static void mdd_trans_stop(const struct lu_context *ctxt,
struct mdd_device *mdd, struct thandle *handle);
static struct dt_object* mdd_object_child(struct mdd_object *o);
-static struct lu_device_operations mdd_lu_ops;
static void mdd_lock(const struct lu_context *ctx,
struct mdd_object *obj, enum dt_lock_mode mode);
static void mdd_unlock(const struct lu_context *ctx,
return info;
}
-static int lu_device_is_mdd(struct lu_device *d)
-{
- /*
- * XXX for now. Tags in lu_device_type->ldt_something are needed.
- */
- return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdd_lu_ops);
-}
-
-static struct mdd_device* lu2mdd_dev(struct lu_device *d)
-{
- LASSERT(lu_device_is_mdd(d));
- return container_of0(d, struct mdd_device, mdd_md_dev.md_lu_dev);
-}
-
-static inline struct lu_device *mdd2lu_dev(struct mdd_device *d)
-{
- return (&d->mdd_md_dev.md_lu_dev);
-}
-
-static struct mdd_object *mdd_obj(struct lu_object *o)
-{
- LASSERT(lu_device_is_mdd(o->lo_dev));
- return container_of0(o, struct mdd_object, mod_obj.mo_lu);
-}
-
-static struct mdd_device* mdo2mdd(struct md_object *mdo)
-{
- return lu2mdd_dev(mdo->mo_lu.lo_dev);
-}
-
-static struct mdd_object* mdo2mddo(struct md_object *mdo)
-{
- return container_of0(mdo, struct mdd_object, mod_obj);
-}
-
-static inline struct dt_device_operations *mdd_child_ops(struct mdd_device *d)
-{
- return d->mdd_child->dd_ops;
-}
-
static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
const struct lu_object_header *hdr,
struct lu_device *d)
RETURN(rc);
}
-static struct lu_device_operations mdd_lu_ops = {
+struct lu_device_operations mdd_lu_ops = {
.ldo_object_alloc = mdd_object_alloc,
- .ldo_process_config = mdd_process_config
+ .ldo_process_config = mdd_process_config,
+ .ldo_notify = mdd_notify
};
static struct lu_object_operations mdd_lu_obj_ops = {
struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
struct lu_attr *lu_attr = NULL;
obd_id *ids;
- loff_t off = 0;
int i, rc;
ENTRY;
GOTO(out, rc);
if (lu_attr->la_size == 0)
- RETURN(0);
+ GOTO(out, rc);
OBD_ALLOC(ids, lu_attr->la_size);
if (ids == NULL)
lov_info->mdd_lov_objids = ids;
lov_info->mdd_lov_objids_size = lu_attr->la_size;
+#if 0
rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids,
lu_attr->la_size, &off);
if (rc < 0) {
CERROR("Error reading objids %d\n", rc);
RETURN(rc);
}
-
+#endif
lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
int mdd_lov_write_objids(const struct lu_context *ctxt,
struct mdd_lov_info *lov_info)
{
- struct dt_object *ids_obj = lov_info->mdd_lov_objid_obj;
- loff_t off = 0;
- int i, rc, tgts;
+ int i, rc = 0, tgts;
ENTRY;
if (!lov_info->mdd_lov_objids_dirty)
for (i = 0; i < tgts; i++)
CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
lov_info->mdd_lov_objids[i], i);
-
+#if 0
rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
lov_info->mdd_lov_objids,
tgts * sizeof(obd_id), &off);
lov_info->mdd_lov_objids_dirty = 0;
rc = 0;
}
-
+#endif
RETURN(rc);
}
lov_info->mdd_lov_obd = ERR_PTR(rc);
RETURN(rc);
}
-#if 0
- /*FIXME: register observer of lov, need obd method,
- * but mdd is not obd now*/
- rc = md_register_observer(mds->mds_osc_obd, obd);
- if (rc) {
- CERROR("MDS cannot register as observer of LOV %s (%d)\n",
- lov_name, rc);
- GOTO(err_discon, rc);
- }
-#endif
+
/* open and test the lov objd file */
rc = mdd_lov_read_objids(ctxt, mdd);
rc = mds_postrecov(obd);
#endif
out:
+ if (rc)
+ obd_disconnect(lov_info->mdd_lov_obd->obd_self_export);
RETURN(rc);
}
{
struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
struct dt_object *obj_id;
- char *lov_name = NULL;
+ struct obd_device *obd = NULL;
+ char *lov_name = NULL, *srv = NULL;
int rc = 0;
ENTRY;
rc = mdd_lov_connect(ctxt, mdd, lov_name);
if (rc)
GOTO(out, rc);
+
+ /*register the obd server for lov*/
+ srv = lustre_cfg_string(cfg, 0);
+ obd = class_name2obd(srv);
+ if (obd == NULL) {
+ CERROR("No such OBD %s\n", srv);
+ LBUG();
+ }
+ rc = obd_register_observer(lov_info->mdd_lov_obd, obd);
+ if (rc) {
+ CERROR("MDS cannot register as observer of LOV %s (%d)\n",
+ lov_name, rc);
+ GOTO(out, rc);
+ }
EXIT;
out:
if (rc)
RETURN(rc);
}
+
+struct mdd_lov_sync_info {
+ struct lu_context *mlsi_ctxt;
+ struct lu_device *mlsi_ld; /* the lov device to sync */
+ struct obd_device *mlsi_watched; /* target osc */
+ __u32 mlsi_index; /* index of target */
+};
+
+#define MDSLOV_NO_INDEX -1
+
+/* Inform MDS about new/updated target */
+static int mdd_lov_update_mds(struct lu_context *ctxt,
+ struct lu_device *ld,
+ struct obd_device *watched,
+ __u32 idx)
+{
+ struct mdd_device *mdd = lu2mdd_dev(ld);
+ struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+ int old_count;
+ int rc = 0;
+ ENTRY;
+
+ old_count = lov_info->mdd_lov_desc.ld_tgt_count;
+ rc = mdd_lov_update_desc(ctxt, mdd);
+ if (rc)
+ RETURN(rc);
+
+ /*
+ * idx is set as data from lov_notify.
+ * XXX did not consider recovery here
+ */
+ if (idx != MDSLOV_NO_INDEX) {
+ if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) {
+ CERROR("index %d > count %d!\n", idx,
+ lov_info->mdd_lov_desc.ld_tgt_count);
+ RETURN(-EINVAL);
+ }
+
+ if (idx >= lov_info->mdd_lov_objids_in_file) {
+ /* We never read this lastid; ask the osc */
+ obd_id lastid;
+ __u32 size = sizeof(lastid);
+ rc = obd_get_info(watched->obd_self_export,
+ strlen("last_id"),
+ "last_id", &size, &lastid);
+ if (rc)
+ RETURN(rc);
+ lov_info->mdd_lov_objids[idx] = lastid;
+ lov_info->mdd_lov_objids_dirty = 1;
+ mdd_lov_write_objids(ctxt, lov_info);
+ } else {
+ /* We have read this lastid from disk; tell the osc.
+ Don't call this during recovery. */
+ rc = mdd_lov_set_nextid(mdd);
+ }
+
+ CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
+ lov_info->mdd_lov_objids[idx], idx);
+ }
+
+ RETURN(rc);
+}
+
+/* We only sync one osc at a time, so that we don't have to hold
+ any kind of lock on the whole mds_lov_desc, which may change
+ (grow) as a result of mds_lov_add_ost. This also avoids any
+ kind of mismatch between the lov_desc and the mds_lov_desc,
+ which are not in lock-step during lov_add_obd */
+static int __mdd_lov_synchronize(void *data)
+{
+ struct mdd_lov_sync_info *mlsi = data;
+ struct lu_device *ld = mlsi->mlsi_ld;
+ struct obd_device *watched = mlsi->mlsi_watched;
+ struct lu_context *ctxt = mlsi->mlsi_ctxt;
+ struct mdd_device *mdd = lu2mdd_dev(ld);
+ struct obd_uuid *uuid;
+ __u32 idx = mlsi->mlsi_index;
+ int rc = 0;
+ ENTRY;
+
+ OBD_FREE(mlsi, sizeof(*mlsi));
+
+ LASSERT(ld);
+ LASSERT(watched);
+ uuid = &watched->u.cli.cl_target_uuid;
+ LASSERT(uuid);
+
+ rc = mdd_lov_update_mds(ctxt, ld, watched, idx);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
+ strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
+ NULL);
+ if (rc != 0)
+ GOTO(out, rc);
+out:
+ lu_device_put(ld);
+ RETURN(rc);
+}
+
+int mdd_lov_synchronize(void *data)
+{
+ struct mdd_lov_sync_info *mlsi = data;
+ char name[20];
+
+ sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index);
+ ptlrpc_daemonize(name);
+
+ RETURN(__mdd_lov_synchronize(data));
+}
+
+int mdd_lov_start_synchronize(const struct lu_context *ctxt,
+ struct lu_device *ld,
+ struct obd_device *watched,
+ void *data, int nonblock)
+{
+ struct mdd_lov_sync_info *mlsi;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(watched);
+
+ OBD_ALLOC(mlsi, sizeof(*mlsi));
+ if (mlsi == NULL)
+ RETURN(-ENOMEM);
+
+ mlsi->mlsi_ctxt = (struct lu_context *)ctxt;
+ mlsi->mlsi_ld = ld;
+ mlsi->mlsi_watched = watched;
+ if (data)
+ mlsi->mlsi_index = *(__u32 *)data;
+ else
+ mlsi->mlsi_index = MDSLOV_NO_INDEX;
+
+ /* Although class_export_get(obd->obd_self_export) would lock
+ the MDS in place, since it's only a self-export
+ it doesn't lock the LOV in place. The LOV can be disconnected
+ during MDS precleanup, leaving nothing for __mdd_lov_synchronize.
+ Simply taking an export ref on the LOV doesn't help, because it's
+ still disconnected. Taking an obd reference insures that we don't
+ disconnect the LOV. This of course means a cleanup won't
+ finish for as long as the sync is blocking. */
+ lu_device_get(ld);
+
+ if (nonblock) {
+ /* Synchronize in the background */
+ rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi,
+ CLONE_VM | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("error starting mdd_lov_synchronize: %d\n", rc);
+ lu_device_put(ld);
+ } else {
+ CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n",
+ mlsi->mlsi_index, rc);
+ rc = 0;
+ }
+ } else {
+ rc = __mdd_lov_synchronize((void *)mlsi);
+ }
+
+ RETURN(rc);
+}
+
+int mdd_notify(const struct lu_context *ctxt, struct lu_device *ld,
+ struct obd_device *watched, enum obd_notify_event ev,
+ void *data)
+{
+ struct mdd_device *mdd = lu2mdd_dev(ld);
+ struct obd_device *obd = ld->ld_site->ls_top_dev->ld_obd;
+ int rc = 0;
+ ENTRY;
+
+ switch (ev) {
+ /* We only handle these: */
+ case OBD_NOTIFY_ACTIVE:
+ case OBD_NOTIFY_SYNC:
+ case OBD_NOTIFY_SYNC_NONBLOCK:
+ break;
+ default:
+ RETURN(0);
+ }
+
+ CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
+
+ if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
+ CERROR("unexpected notification of %s %s!\n",
+ watched->obd_type->typ_name, watched->obd_name);
+ RETURN(-EINVAL);
+ }
+
+ /*FIXME later, Recovery stuff still not be designed */
+ if (obd->obd_recovering) {
+ CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
+ obd->obd_name,
+ obd_uuid2str(&watched->u.cli.cl_target_uuid));
+ /* We still have to fix the lov descriptor for ost's added
+ after the mdt in the config log. They didn't make it into
+ mds_lov_connect. */
+ rc = mdd_lov_update_desc(ctxt, mdd);
+ RETURN(rc);
+ }
+
+ rc = mdd_lov_start_synchronize(ctxt, ld, watched, data,
+ !(ev == OBD_NOTIFY_SYNC));
+ RETURN(rc);
+}