int mds_reint_rec(struct mds_update_record *r, int offset,
struct ptlrpc_request *req, struct lustre_handle *);
+int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli,
+ char *lov_name, struct obd_uuid *uuid,
+ struct md_lov_ops *mlo, const void *ctxt);
+int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli,
+ struct obd_device *watched, enum obd_notify_event ev,
+ void *data);
+int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli,
+ struct obd_device *watched,
+ void *data, int nonblock, const void *ctxt);
/* ioctls for trying requests */
#define IOC_REQUEST_TYPE 'f'
#define IOC_REQUEST_MIN_NR 30
struct semaphore mgs_sem;
};
+struct md_lov_info;
+
+struct md_lov_ops {
+ int (*ml_read_objids)(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt);
+ int (*ml_write_objids)(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt);
+};
struct md_lov_info {
- struct obd_device *md_osc_obd; /* XXX lov_obd */
+ struct obd_device *md_lov_obd; /* XXX lov_obd */
struct obd_uuid md_lov_uuid;
- struct obd_export *md_osc_exp; /* XXX lov_exp */
+ struct obd_export *md_lov_exp; /* XXX lov_exp */
struct lov_desc md_lov_desc;
obd_id *md_lov_objids;
int md_lov_objids_size;
__u32 md_lov_objids_in_file;
unsigned int md_lov_objids_dirty:1;
int md_lov_nextid_set;
- struct file *md_lov_objid_filp;
+ void *md_lov_objid_obj;
+ struct lu_fid md_lov_objid_fid;
unsigned long md_lov_objids_valid:1;
+ int md_lov_max_mdsize;
+ int md_lov_max_cookiesize;
+ struct semaphore md_lov_orphan_recovery_sem;
+ struct md_lov_ops *md_lov_ops;
};
-#define mds_osc_obd mds_lov_info.md_osc_obd
+#define mds_osc_obd mds_lov_info.md_lov_obd
#define mds_lov_uuid mds_lov_info.md_lov_uuid
-#define mds_osc_exp mds_lov_info.md_osc_exp
+#define mds_osc_exp mds_lov_info.md_lov_exp
#define mds_lov_desc mds_lov_info.md_lov_desc
#define mds_lov_objids mds_lov_info.md_lov_objids
#define mds_lov_objids_size mds_lov_info.md_lov_objids_size
#define mds_lov_objids_in_file mds_lov_info.md_lov_objids_in_file
#define mds_lov_objids_dirty mds_lov_info.md_lov_objids_dirty
#define mds_lov_nextid_set mds_lov_info.md_lov_nextid_set
-#define mds_lov_objid_filp mds_lov_info.md_lov_objid_filp
+#define mds_lov_objid_filp mds_lov_info.md_lov_objid_obj
#define mds_lov_objids_valid mds_lov_info.md_lov_objids_valid
+#define mds_max_mdsize mds_lov_info.md_lov_max_mdsize
+#define mds_max_cookiesize mds_lov_info.md_lov_max_cookiesize
+#define mds_orphan_recovery_sem mds_lov_info.md_lov_orphan_recovery_sem
struct mds_obd {
/* NB this field MUST be first */
struct ptlrpc_service *mds_readpage_service;
struct vfsmount *mds_vfsmnt;
cfs_dentry_t *mds_fid_de;
- int mds_max_mdsize;
- int mds_max_cookiesize;
struct file *mds_rcvd_filp;
spinlock_t mds_transno_lock;
__u64 mds_last_transno;
char *mds_profile;
struct file *mds_health_check_filp;
unsigned long *mds_client_bitmap;
- struct semaphore mds_orphan_recovery_sem;
struct upcall_cache *mds_group_hash;
struct lustre_quota_info mds_quota_info;
struct dt_device;
-struct mdd_lov_info {
- struct obd_device *mdd_lov_obd;
- struct obd_uuid mdd_lov_uuid;
- struct lov_desc mdd_lov_desc;
- obd_id *mdd_lov_objids;
- int mdd_lov_objids_size;
- __u32 mdd_lov_objids_in_file;
- int mdd_lov_nextid_set;
- struct lu_fid mdd_lov_objid_fid;
- struct dt_object *mdd_lov_objid_obj;
- unsigned int mdd_lov_objids_dirty:1;
-};
-
struct mdd_device {
struct md_device mdd_md_dev;
struct dt_device *mdd_child;
- struct mdd_lov_info mdd_lov_info;
+ struct md_lov_info mdd_lov_info;
struct dt_device mdd_lov_dev;
- int mdd_max_mdsize;
- int mdd_max_cookiesize;
struct lu_fid mdd_root_fid;
};
#include <md_object.h>
#include <dt_object.h>
#include <lustre_mds.h>
+#include <lustre/lustre_idl.h>
#include "mdd_internal.h"
static const char mdd_lov_objid_name[] = "lov_objid";
-static int mdd_lov_read_objids(const struct lu_context *ctxt,
- struct mdd_device *mdd)
+static int mdd_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt)
{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
- struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
+ struct dt_object *obj_ids = mli->md_lov_objid_obj;
struct lu_attr *lu_attr = &mdd_ctx_info(ctxt)->mti_attr;
obd_id *ids;
int i, rc;
ENTRY;
- LASSERT(!lov_info->mdd_lov_objids_size);
- LASSERT(!lov_info->mdd_lov_objids_dirty);
+ LASSERT(!mli->md_lov_objids_size);
+ LASSERT(!mli->md_lov_objids_dirty);
/* Read everything in the file, even if our current lov desc
has fewer targets. Old targets not in the lov descriptor
if (ids == NULL)
RETURN(-ENOMEM);
- lov_info->mdd_lov_objids = ids;
- lov_info->mdd_lov_objids_size = lu_attr->la_size;
+ mli->md_lov_objids = ids;
+ mli->md_lov_objids_size = lu_attr->la_size;
#if 0
rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids,
RETURN(rc);
}
#endif
- lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
+ mli->md_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
- for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
+ for (i = 0; i < mli->md_lov_objids_in_file; i++) {
CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
- lov_info->mdd_lov_objids[i], i);
+ mli->md_lov_objids[i], i);
}
out:
RETURN(0);
}
-/* Update the lov desc for a new size lov. */
-static int mdd_lov_update_desc(const struct lu_context *ctxt,
- struct mdd_device *mdd)
-{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
- __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc);
- struct lov_desc *ld;
- struct obd_device *lov_obd = lov_info->mdd_lov_obd;
- int rc = 0;
- ENTRY;
-
- ld = &mdd_ctx_info(ctxt)->mti_ld;
-
- rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1,
- KEY_LOVDESC, &valsize, ld);
- if (rc)
- GOTO(out, rc);
-
- /* The size of the LOV target table may have increased. */
- size = ld->ld_tgt_count * sizeof(obd_id);
- if ((lov_info->mdd_lov_objids_size == 0) ||
- (size > lov_info->mdd_lov_objids_size)) {
- obd_id *ids;
-
- /* add room by powers of 2 */
- size = 1;
- while (size < ld->ld_tgt_count)
- size = size << 1;
- size = size * sizeof(obd_id);
-
- OBD_ALLOC(ids, size);
- if (ids == NULL)
- GOTO(out, rc = -ENOMEM);
- memset(ids, 0, size);
- if (lov_info->mdd_lov_objids_size) {
- obd_id *old_ids = lov_info->mdd_lov_objids;
- memcpy(ids, lov_info->mdd_lov_objids,
- lov_info->mdd_lov_objids_size);
- lov_info->mdd_lov_objids = ids;
- OBD_FREE(old_ids, lov_info->mdd_lov_objids_size);
- }
- lov_info->mdd_lov_objids = ids;
- lov_info->mdd_lov_objids_size = size;
- }
-
- /* Don't change the mds_lov_desc until the objids size matches the
- count (paranoia) */
- lov_info->mdd_lov_desc = *ld;
- CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
- lov_info->mdd_lov_desc.ld_tgt_count);
-
- stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
- max(lov_info->mdd_lov_desc.ld_tgt_count,
- lov_info->mdd_lov_objids_in_file));
- mdd->mdd_max_mdsize = lov_mds_md_size(stripes);
- mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie);
- CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
- mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize);
-out:
- RETURN(rc);
-}
-
-int mdd_lov_write_objids(const struct lu_context *ctxt,
- struct mdd_lov_info *lov_info)
+int mdd_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt)
{
int i, rc = 0, tgts;
ENTRY;
- if (!lov_info->mdd_lov_objids_dirty)
+ if (!mli->md_lov_objids_dirty)
RETURN(0);
- tgts = max(lov_info->mdd_lov_desc.ld_tgt_count,
- lov_info->mdd_lov_objids_in_file);
+ tgts = max(mli->md_lov_desc.ld_tgt_count,
+ mli->md_lov_objids_in_file);
if (!tgts)
RETURN(0);
for (i = 0; i < tgts; i++)
CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
- lov_info->mdd_lov_objids[i], i);
+ mli->md_lov_objids[i], i);
#if 0
rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
- lov_info->mdd_lov_objids,
+ mli->mdd_lov_objids,
tgts * sizeof(obd_id), &off);
if (rc >= 0) {
- lov_info->mdd_lov_objids_dirty = 0;
+ mli->mdd_lov_objids_dirty = 0;
rc = 0;
}
#endif
RETURN(rc);
}
-static int mdd_lov_connect(const struct lu_context *ctxt,
- struct mdd_device *mdd, char *lov_name)
-{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
- struct lustre_handle conn = {0,};
- struct obd_connect_data *data;
- int rc = 0;
- ENTRY;
-
- /*connect to obd*/
- OBD_ALLOC(data, sizeof(*data));
- if (data == NULL)
- RETURN(-ENOMEM);
- data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
- OBD_CONNECT_REQPORTAL;
- data->ocd_version = LUSTRE_VERSION_CODE;
- /* NB: lov_connect() needs to fill in .ocd_index for each OST */
- rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid,
- data);
- OBD_FREE(data, sizeof(*data));
- if (rc) {
- CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
- lov_info->mdd_lov_obd = ERR_PTR(rc);
- RETURN(rc);
- }
-
- /* open and test the lov objd file */
-
- rc = mdd_lov_read_objids(ctxt, mdd);
- if (rc) {
- CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
- GOTO(out, rc);
- }
-
- rc = mdd_lov_update_desc(ctxt, mdd);
- if (rc)
- GOTO(out, rc);
-#if 0
- /* tgt_count may be 0! */
- rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
- if (rc) {
- CERROR("failed to initialize catalog %d\n", rc);
- GOTO(err_reg, rc);
- }
-#endif
- /* If we're mounting this code for the first time on an existing FS,
- * we need to populate the objids array from the real OST values */
- if (lov_info->mdd_lov_desc.ld_tgt_count >
- lov_info->mdd_lov_objids_in_file) {
- int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count;
- int i;
-
- rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export,
- strlen("last_id"), "last_id", &size,
- lov_info->mdd_lov_objids);
- if (!rc) {
- for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++)
- CWARN("got last object "LPU64" from OST %d\n",
- lov_info->mdd_lov_objids[i], i);
- lov_info->mdd_lov_objids_dirty = 1;
- rc = mdd_lov_write_objids(ctxt, lov_info);
- if (rc)
- CERROR("got last objids from OSTs, but error "
- "writing objids file: %d\n", rc);
- }
- }
- /* I want to see a callback happen when the OBD moves to a
- * "For General Use" state, and that's when we'll call
- * set_nextid(). The class driver can help us here, because
- * it can use the obd_recovering flag to determine when the
- * the OBD is full available. */
-#if 0
- if (!obd->obd_recovering)
- rc = mds_postrecov(obd);
-#endif
-out:
- if (rc)
- obd_disconnect(lov_info->mdd_lov_obd->obd_self_export);
- RETURN(rc);
-}
+struct md_lov_ops mdd_lov_ops = {
+ .ml_read_objids = mdd_lov_read_objids,
+ .ml_write_objids = mdd_lov_write_objids,
+};
int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd)
{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+ struct md_lov_info *mli = &mdd->mdd_lov_info;
- dt_object_fini(lov_info->mdd_lov_objid_obj);
+ obd_register_observer(mli->md_lov_obd, NULL);
+
+ if (mli->md_lov_exp) {
+ obd_disconnect(mli->md_lov_exp);
+ mli->md_lov_exp = NULL;
+ }
+
+ dt_object_fini(mli->md_lov_objid_obj);
return 0;
}
int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
struct lustre_cfg *cfg)
{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
+ struct md_lov_info *lov_info = &mdd->mdd_lov_info;
struct dt_object *obj_id;
struct obd_device *obd = NULL;
char *lov_name = NULL, *srv = NULL;
int rc = 0;
ENTRY;
- if (IS_ERR(lov_info->mdd_lov_obd))
- RETURN(PTR_ERR(lov_info->mdd_lov_obd));
+ if (IS_ERR(lov_info->md_lov_obd))
+ RETURN(PTR_ERR(lov_info->md_lov_obd));
lov_name = lustre_cfg_string(cfg, 3);
LASSERTF(lov_name != NULL, "MDD need lov \n");
- lov_info->mdd_lov_obd = class_name2obd(lov_name);
- if (!lov_info->mdd_lov_obd) {
- CERROR("MDS cannot locate LOV %s\n", lov_name);
- lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN);
- RETURN(-ENOTCONN);
- }
obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name,
- &lov_info->mdd_lov_objid_fid);
+ &lov_info->md_lov_objid_fid);
if (IS_ERR(obj_id)){
rc = PTR_ERR(obj_id);
RETURN(rc);
}
LASSERT(obj_id != NULL);
- lov_info->mdd_lov_objid_obj = obj_id;
-
- obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1));
+ lov_info->md_lov_objid_obj = obj_id;
- rc = mdd_lov_connect(ctxt, mdd, lov_name);
- if (rc)
- GOTO(out, rc);
-
- /*register the obd server for lov*/
srv = lustre_cfg_string(cfg, 0);
obd = class_name2obd(srv);
if (obd == NULL) {
CERROR("No such OBD %s\n", srv);
LBUG();
}
- rc = obd_register_observer(lov_info->mdd_lov_obd, obd);
- if (rc) {
- CERROR("MDS cannot register as observer of LOV %s (%d)\n",
- lov_name, rc);
- GOTO(out, rc);
- }
- EXIT;
-out:
+ rc = md_lov_connect(obd, lov_info, lov_name,
+ &obd->obd_uuid, &mdd_lov_ops, ctxt);
if (rc)
mdd_lov_fini(ctxt, mdd);
- return rc;
-}
-
-/* update the LOV-OSC knowledge of the last used object id's */
-int mdd_lov_set_nextid(struct mdd_device *mdd)
-{
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
- int rc;
- ENTRY;
-
- LASSERT(lov_info->mdd_lov_objids != NULL);
-
- rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export,
- strlen(KEY_NEXT_ID), KEY_NEXT_ID,
- lov_info->mdd_lov_desc.ld_tgt_count,
- lov_info->mdd_lov_objids, NULL);
-
- if (rc)
- CERROR ("mdd_lov_set_nextid failed (%d)\n", rc);
-
- RETURN(rc);
-}
-
-struct mdd_lov_sync_info {
- struct lu_context *mlsi_ctxt;
- struct lu_device *mlsi_ld; /* the lov device to sync */
- struct obd_device *mlsi_watched; /* target osc */
- __u32 mlsi_index; /* index of target */
-};
-
-#define MDSLOV_NO_INDEX -1
-
-/* Inform MDS about new/updated target */
-static int mdd_lov_update_mds(struct lu_context *ctxt,
- struct lu_device *ld,
- struct obd_device *watched,
- __u32 idx)
-{
- struct mdd_device *mdd = lu2mdd_dev(ld);
- struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
- int old_count;
- int rc = 0;
- ENTRY;
-
- old_count = lov_info->mdd_lov_desc.ld_tgt_count;
- rc = mdd_lov_update_desc(ctxt, mdd);
- if (rc)
- RETURN(rc);
-
- /*
- * idx is set as data from lov_notify.
- * XXX did not consider recovery here
- */
- if (idx != MDSLOV_NO_INDEX) {
- if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) {
- CERROR("index %d > count %d!\n", idx,
- lov_info->mdd_lov_desc.ld_tgt_count);
- RETURN(-EINVAL);
- }
-
- if (idx >= lov_info->mdd_lov_objids_in_file) {
- /* We never read this lastid; ask the osc */
- obd_id lastid;
- __u32 size = sizeof(lastid);
- rc = obd_get_info(watched->obd_self_export,
- strlen("last_id"),
- "last_id", &size, &lastid);
- if (rc)
- RETURN(rc);
- lov_info->mdd_lov_objids[idx] = lastid;
- lov_info->mdd_lov_objids_dirty = 1;
- mdd_lov_write_objids(ctxt, lov_info);
- } else {
- /* We have read this lastid from disk; tell the osc.
- Don't call this during recovery. */
- rc = mdd_lov_set_nextid(mdd);
- }
-
- CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
- lov_info->mdd_lov_objids[idx], idx);
- }
-
- RETURN(rc);
-}
-
-static int mdd_lov_clear_orphans(struct mdd_lov_info *mli,
- struct obd_uuid *ost_uuid)
-{
- int rc;
- struct obdo oa;
- struct obd_trans_info oti = {0};
- struct lov_stripe_md *empty_ea = NULL;
- ENTRY;
-
- LASSERT(mli->mdd_lov_objids != NULL);
-
- /* This create will in fact either create or destroy: If the OST is
- * missing objects below this ID, they will be created. If it finds
- * objects above this ID, they will be removed. */
- memset(&oa, 0, sizeof(oa));
- oa.o_valid = OBD_MD_FLFLAGS;
- oa.o_flags = OBD_FL_DELORPHAN;
- if (ost_uuid != NULL) {
- memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
- oa.o_valid |= OBD_MD_FLINLINE;
- }
- rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa,
- &empty_ea, &oti);
-
- RETURN(rc);
-}
-
-/* We only sync one osc at a time, so that we don't have to hold
- any kind of lock on the whole mds_lov_desc, which may change
- (grow) as a result of mds_lov_add_ost. This also avoids any
- kind of mismatch between the lov_desc and the mds_lov_desc,
- which are not in lock-step during lov_add_obd */
-static int __mdd_lov_synchronize(void *data)
-{
- struct mdd_lov_sync_info *mlsi = data;
- struct lu_device *ld = mlsi->mlsi_ld;
- struct obd_device *watched = mlsi->mlsi_watched;
- struct lu_context *ctxt = mlsi->mlsi_ctxt;
- struct mdd_device *mdd = lu2mdd_dev(ld);
- struct obd_uuid *uuid;
- __u32 idx = mlsi->mlsi_index;
- int rc = 0;
- ENTRY;
-
- OBD_FREE(mlsi, sizeof(*mlsi));
-
- LASSERT(ld);
- LASSERT(watched);
- uuid = &watched->u.cli.cl_target_uuid;
- LASSERT(uuid);
-
- rc = mdd_lov_update_mds(ctxt, ld, watched, idx);
- if (rc != 0)
- GOTO(out, rc);
-
- rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
- strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
- NULL);
- if (rc != 0) {
- CERROR("failed at obd_set_info_async: %d\n", rc);
- GOTO(out, rc);
- }
-
- rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid);
- if (rc != 0) {
- CERROR("failed at mds_lov_clear_orphans: %d\n", rc);
- GOTO(out, rc);
- }
-out:
- EXIT;
- lu_device_put(ld);
- return rc;
-}
-
-int mdd_lov_synchronize(void *data)
-{
- struct mdd_lov_sync_info *mlsi = data;
- char name[20];
-
- sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index);
- ptlrpc_daemonize(name);
-
- RETURN(__mdd_lov_synchronize(data));
-}
-
-int mdd_lov_start_synchronize(const struct lu_context *ctxt,
- struct lu_device *ld,
- struct obd_device *watched,
- void *data, int nonblock)
-{
- struct mdd_lov_sync_info *mlsi;
- int rc;
-
- ENTRY;
-
- LASSERT(watched);
-
- OBD_ALLOC(mlsi, sizeof(*mlsi));
- if (mlsi == NULL)
- RETURN(-ENOMEM);
-
- mlsi->mlsi_ctxt = (struct lu_context *)ctxt;
- mlsi->mlsi_ld = ld;
- mlsi->mlsi_watched = watched;
- if (data)
- mlsi->mlsi_index = *(__u32 *)data;
- else
- mlsi->mlsi_index = MDSLOV_NO_INDEX;
-
- /* Although class_export_get(obd->obd_self_export) would lock
- the MDS in place, since it's only a self-export
- it doesn't lock the LOV in place. The LOV can be disconnected
- during MDS precleanup, leaving nothing for __mdd_lov_synchronize.
- Simply taking an export ref on the LOV doesn't help, because it's
- still disconnected. Taking an obd reference insures that we don't
- disconnect the LOV. This of course means a cleanup won't
- finish for as long as the sync is blocking. */
- lu_device_get(ld);
-#if 0
- if (nonblock) {
- /* Synchronize in the background */
- rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi,
- CLONE_VM | CLONE_FILES);
- if (rc < 0) {
- CERROR("error starting mdd_lov_synchronize: %d\n", rc);
- lu_device_put(ld);
- } else {
- CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n",
- mlsi->mlsi_index, rc);
- rc = 0;
- }
- } else {
- rc = __mdd_lov_synchronize((void *)mlsi);
- }
-#else
- /*FIXME: Did not implement the nonblock lov sync here. because ctxt can not
- * be shared, maybe we need ref_count for ctxt */
- rc = __mdd_lov_synchronize((void *)mlsi);
-#endif
RETURN(rc);
}
int rc = 0;
ENTRY;
- switch (ev) {
- /* We only handle these: */
- case OBD_NOTIFY_ACTIVE:
- case OBD_NOTIFY_SYNC:
- case OBD_NOTIFY_SYNC_NONBLOCK:
- break;
- default:
- RETURN(0);
- }
-
- CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
-
- if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
- CERROR("unexpected notification of %s %s!\n",
- watched->obd_type->typ_name, watched->obd_name);
- RETURN(-EINVAL);
- }
-
- /*FIXME later, Recovery stuff still not be designed */
- if (obd->obd_recovering) {
- CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
- obd->obd_name,
- obd_uuid2str(&watched->u.cli.cl_target_uuid));
- /* We still have to fix the lov descriptor for ost's added
- after the mdt in the config log. They didn't make it into
- mds_lov_connect. */
- rc = mdd_lov_update_desc(ctxt, mdd);
+ rc = md_lov_notity_pre(obd, &mdd->mdd_lov_info, watched, ev, data);
+ if (rc) {
+ if (rc == -ENOENT || rc == -EBUSY)
+ rc = 0;
RETURN(rc);
}
- rc = mdd_lov_start_synchronize(ctxt, ld, watched, data,
- !(ev == OBD_NOTIFY_SYNC));
+ rc = md_lov_start_synchronize(obd, &mdd->mdd_lov_info, watched, data,
+ !(ev == OBD_NOTIFY_SYNC), ctxt);
+
RETURN(rc);
}
int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
struct mdd_object *child)
{
- struct mdd_lov_info *mli = &mdd->mdd_lov_info;
+ struct md_lov_info *mli = &mdd->mdd_lov_info;
struct obdo *oa;
struct lov_mds_md *lmm = NULL;
struct lov_stripe_md *lsm = NULL;
OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
oa->o_size = 0;
- rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL);
+ rc = obd_create(mli->md_lov_exp, oa, &lsm, NULL);
if (rc)
GOTO(out_oa, rc);
- rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm);
+ rc = obd_packmd(mli->md_lov_exp, &lmm, lsm);
if (rc < 0) {
CERROR("cannot pack lsm, err = %d\n", rc);
GOTO(out_oa, rc);
}
lmm_size = rc;
-
- rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size,
- MDS_LOV_MD_NAME);
+ rc = 0;
+ /*FIXME: did not set MD here */
out_oa:
obdo_free(oa);
RETURN(rc);
filp_close(mds->mds_health_check_filp, 0))
CERROR("can't close %s after error\n", HEALTH_CHECK);
err_lov_objid:
- if (mds->mds_lov_objid_filp && filp_close(mds->mds_lov_objid_filp, 0))
+ if (mds->mds_lov_objid_filp &&
+ filp_close((struct file *)mds->mds_lov_objid_filp, 0))
CERROR("can't close %s after error\n", LOV_OBJID);
err_client:
class_disconnect_exports(obd);
CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
}
if (mds->mds_lov_objid_filp) {
- rc = filp_close(mds->mds_lov_objid_filp, 0);
+ rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
mds->mds_lov_objid_filp = NULL;
if (rc)
CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
/* mds/mds_lov.c */
int mds_lov_connect(struct obd_device *obd, char * lov_name);
int mds_lov_disconnect(struct obd_device *obd);
-int mds_lov_write_objids(struct obd_device *obd);
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt);
int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
+void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
int mds_lov_set_nextid(struct obd_device *obd);
int mds_lov_start_synchronize(struct obd_device *obd,
struct obd_device *watched,
}
unlock_kernel();
}
-EXPORT_SYMBOL(md_lov_info_update_objids);
void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
{
md_lov_info_update_objids(&mds->mds_lov_info, ids);
}
-static int mds_lov_read_objids(struct obd_device *obd)
+static int mds_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt)
{
- struct mds_obd *mds = &obd->u.mds;
+ struct file *filp = (struct file *)mli->md_lov_objid_obj;
obd_id *ids;
loff_t off = 0;
int i, rc, size;
ENTRY;
- LASSERT(!mds->mds_lov_objids_size);
- LASSERT(!mds->mds_lov_objids_dirty);
+ LASSERT(!mli->md_lov_objids_size);
+ LASSERT(!mli->md_lov_objids_dirty);
/* Read everything in the file, even if our current lov desc
has fewer targets. Old targets not in the lov descriptor
during mds setup may still have valid objids. */
- size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size;
+ size = filp->f_dentry->d_inode->i_size;
if (size == 0)
RETURN(0);
OBD_ALLOC(ids, size);
if (ids == NULL)
RETURN(-ENOMEM);
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_size = size;
+ mli->md_lov_objids = ids;
+ mli->md_lov_objids_size = size;
- rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
+ rc = fsfilt_read_record(obd, filp, ids, size, &off);
if (rc < 0) {
CERROR("Error reading objids %d\n", rc);
RETURN(rc);
}
- mds->mds_lov_objids_in_file = size / sizeof(*ids);
+ mli->md_lov_objids_in_file = size / sizeof(*ids);
- for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
+ for (i = 0; i < mli->md_lov_objids_in_file; i++) {
CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
- mds->mds_lov_objids[i], i);
+ mli->md_lov_objids[i], i);
}
RETURN(0);
}
-int mds_lov_write_objids(struct obd_device *obd)
+int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt)
{
- struct mds_obd *mds = &obd->u.mds;
+ struct file *filp = (struct file *)mli->md_lov_objid_obj;
loff_t off = 0;
int i, rc, tgts;
ENTRY;
- if (!mds->mds_lov_objids_dirty)
+ if (!mli->md_lov_objids_dirty)
RETURN(0);
- tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
+ tgts = max(mli->md_lov_desc.ld_tgt_count, mli->md_lov_objids_in_file);
if (!tgts)
RETURN(0);
for (i = 0; i < tgts; i++)
CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
- mds->mds_lov_objids[i], i);
+ mli->md_lov_objids[i], i);
- rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
- mds->mds_lov_objids, tgts * sizeof(obd_id),
+ rc = fsfilt_write_record(obd, filp, mli->md_lov_objids,
+ tgts * sizeof(obd_id),
&off, 0);
if (rc >= 0) {
- mds->mds_lov_objids_dirty = 0;
+ mli->md_lov_objids_dirty = 0;
rc = 0;
}
RETURN(rc);
}
-int md_lov_info_clear_orphans(struct md_lov_info *mli,
- struct obd_uuid *ost_uuid)
+struct md_lov_ops mli_ops = {
+ .ml_read_objids = mds_lov_read_objids,
+ .ml_write_objids = mds_lov_write_objids,
+};
+
+int md_lov_clear_orphans(struct md_lov_info *mli, struct obd_uuid *ost_uuid)
{
int rc;
struct obdo oa;
memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
oa.o_valid |= OBD_MD_FLINLINE;
}
- rc = obd_create(mli->md_osc_exp, &oa, &empty_ea, &oti);
+ rc = obd_create(mli->md_lov_exp, &oa, &empty_ea, &oti);
RETURN(rc);
}
-EXPORT_SYMBOL(md_lov_info_clear_orphans);
int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
{
- return md_lov_info_clear_orphans(&mds->mds_lov_info, ost_uuid);
+ return md_lov_clear_orphans(&mds->mds_lov_info, ost_uuid);
}
-int md_lov_info_set_nextid(struct md_lov_info *mli)
+/* update the LOV-OSC knowledge of the last used object id's */
+static int md_lov_info_set_nextid(struct obd_device *obd,
+ struct md_lov_info *mli)
{
int rc;
ENTRY;
-
+
+ LASSERT(!obd->obd_recovering);
LASSERT(mli->md_lov_objids != NULL);
- rc = obd_set_info_async(mli->md_osc_exp, strlen(KEY_NEXT_ID),
+ rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_NEXT_ID),
KEY_NEXT_ID,
mli->md_lov_desc.ld_tgt_count,
mli->md_lov_objids, NULL);
-
RETURN(rc);
-
}
-EXPORT_SYMBOL(md_lov_info_set_nextid);
-/* update the LOV-OSC knowledge of the last used object id's */
int mds_lov_set_nextid(struct obd_device *obd)
{
- struct mds_obd *mds = &obd->u.mds;
- int rc;
-
- LASSERT(!obd->obd_recovering);
-
- rc = md_lov_info_set_nextid(&mds->mds_lov_info);
- if (rc)
- CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
- obd->obd_name, rc);
- RETURN(rc);
+ struct md_lov_info *mli = &obd->u.mds.mds_lov_info;
+ return md_lov_info_set_nextid(obd, mli);
}
int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov)
{
struct lov_desc *ld;
- __u32 size, valsize = sizeof(mli->md_lov_desc);
+ __u32 size, stripes, valsize = sizeof(mli->md_lov_desc);
int rc = 0;
ENTRY;
mli->md_lov_desc = *ld;
CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
mli->md_lov_desc.ld_tgt_count);
-out:
- OBD_FREE(ld, sizeof(*ld));
- RETURN(rc);
-}
-
-/* Update the lov desc for a new size lov. */
-static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
-{
- struct mds_obd *mds = &obd->u.mds;
- __u32 stripes;
- int rc = 0;
- ENTRY;
- rc = md_lov_info_update_desc(&mds->mds_lov_info, lov);
- if (rc)
- GOTO(out, rc);
-
stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
- max(mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids_in_file));
- mds->mds_max_mdsize = lov_mds_md_size(stripes);
- mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+ max(mli->md_lov_desc.ld_tgt_count,
+ mli->md_lov_objids_in_file));
+
+ mli->md_lov_max_mdsize = lov_mds_md_size(stripes);
+ mli->md_lov_max_cookiesize = stripes * sizeof(struct llog_cookie);
CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
- mds->mds_max_mdsize, mds->mds_max_cookiesize);
+ mli->md_lov_max_mdsize, mli->md_lov_max_cookiesize);
+
out:
+ OBD_FREE(ld, sizeof(*ld));
RETURN(rc);
}
-
#define MDSLOV_NO_INDEX -1
-
/* Inform MDS about new/updated target */
-static int mds_lov_update_mds(struct obd_device *obd,
+static int mds_lov_update_mds(struct obd_device *obd,
+ struct md_lov_info *mli,
struct obd_device *watched,
- __u32 idx)
+ __u32 idx, const void *ctxt)
{
- struct mds_obd *mds = &obd->u.mds;
int old_count;
- int rc = 0;
+ int rc;
ENTRY;
- old_count = mds->mds_lov_desc.ld_tgt_count;
- rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ old_count = mli->md_lov_desc.ld_tgt_count;
+ rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
if (rc)
RETURN(rc);
CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
- idx, obd->obd_recovering, obd->obd_async_recov, old_count,
- mds->mds_lov_desc.ld_tgt_count);
+ idx, obd->obd_recovering, obd->obd_async_recov, old_count,
+ mli->md_lov_desc.ld_tgt_count);
/* idx is set as data from lov_notify. */
if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
- if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+ if (idx >= mli->md_lov_desc.ld_tgt_count) {
CERROR("index %d > count %d!\n", idx,
- mds->mds_lov_desc.ld_tgt_count);
+ mli->md_lov_desc.ld_tgt_count);
RETURN(-EINVAL);
}
- if (idx >= mds->mds_lov_objids_in_file) {
+ if (idx >= mli->md_lov_objids_in_file) {
/* We never read this lastid; ask the osc */
obd_id lastid;
__u32 size = sizeof(lastid);
"last_id", &size, &lastid);
if (rc)
RETURN(rc);
- mds->mds_lov_objids[idx] = lastid;
- mds->mds_lov_objids_dirty = 1;
- mds_lov_write_objids(obd);
+ mli->md_lov_objids[idx] = lastid;
+ mli->md_lov_objids_dirty = 1;
+ mli->md_lov_ops->ml_write_objids(obd, mli, ctxt);
} else {
/* We have read this lastid from disk; tell the osc.
Don't call this during recovery. */
- rc = mds_lov_set_nextid(obd);
+ rc = md_lov_info_set_nextid(obd, mli);
}
CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[idx], idx);
+ mli->md_lov_objids[idx], idx);
}
-
+#if 0
+ /*FIXME: Do not support llog in mdd, so disable this temporarily*/
/* If we added a target we have to reconnect the llogs */
/* Only do this at first add (idx), or the first time after recovery */
if (idx != MDSLOV_NO_INDEX || 1/*FIXME*/) {
CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
/* These two must be atomic */
- down(&mds->mds_orphan_recovery_sem);
+ down(&mli->md_lov_orphan_recovery_sem);
obd_llog_finish(obd, old_count);
- llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
- up(&mds->mds_orphan_recovery_sem);
+ llog_cat_initialize(obd, mli->md_lov_desc.ld_tgt_count);
+ up(&mli->md_lov_orphan_recovery_sem);
}
-
+#endif
RETURN(rc);
}
-/* update the LOV-OSC knowledge of the last used object id's */
-int mds_lov_connect(struct obd_device *obd, char * lov_name)
+int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli,
+ char *lov_name, struct obd_uuid *uuid,
+ struct md_lov_ops *mlo, const void *ctxt)
{
- struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
struct obd_connect_data *data;
- int rc, i;
- ENTRY;
+ int rc;
- if (IS_ERR(mds->mds_osc_obd))
- RETURN(PTR_ERR(mds->mds_osc_obd));
+ if (IS_ERR(mli->md_lov_obd))
+ RETURN(PTR_ERR(mli->md_lov_obd));
- if (mds->mds_osc_obd)
+ if (mli->md_lov_obd)
RETURN(0);
- mds->mds_osc_obd = class_name2obd(lov_name);
- if (!mds->mds_osc_obd) {
+ mli->md_lov_obd = class_name2obd(lov_name);
+ if (!mli->md_lov_obd) {
CERROR("MDS cannot locate LOV %s\n", lov_name);
- mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
+ mli->md_lov_obd = ERR_PTR(-ENOTCONN);
RETURN(-ENOTCONN);
}
+ mli->md_lov_ops = mlo;
+
OBD_ALLOC(data, sizeof(*data));
if (data == NULL)
RETURN(-ENOMEM);
data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
OBD_CONNECT_REQPORTAL;
data->ocd_version = LUSTRE_VERSION_CODE;
+
/* NB: lov_connect() needs to fill in .ocd_index for each OST */
- rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
+ rc = obd_connect(&conn, mli->md_lov_obd, uuid, data);
OBD_FREE(data, sizeof(*data));
if (rc) {
CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
- mds->mds_osc_obd = ERR_PTR(rc);
- RETURN(rc);
+ GOTO(out, rc);
}
- mds->mds_osc_exp = class_conn2export(&conn);
+ mli->md_lov_exp = class_conn2export(&conn);
- rc = obd_register_observer(mds->mds_osc_obd, obd);
+ rc = obd_register_observer(mli->md_lov_obd, obd);
if (rc) {
CERROR("MDS cannot register as observer of LOV %s (%d)\n",
lov_name, rc);
- GOTO(err_discon, rc);
+ GOTO(out, rc);
}
-
- rc = mds_lov_read_objids(obd);
+
+ rc = mli->md_lov_ops->ml_read_objids(obd, mli, ctxt);
if (rc) {
CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
- GOTO(err_reg, rc);
+ GOTO(out, rc);
}
- rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
if (rc)
- GOTO(err_reg, rc);
+ GOTO(out, rc);
+out:
+ RETURN(rc);
+}
+EXPORT_SYMBOL(md_lov_connect);
- /* tgt_count may be 0! */
- rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
- if (rc) {
- CERROR("failed to initialize catalog %d\n", rc);
- GOTO(err_reg, rc);
- }
+int md_lov_update_objids(struct obd_device *obd, struct md_lov_info *mli,
+ const void *ctxt)
+{
+ int rc = 0, i;
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
- if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
- int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
- rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
- "last_id", &size, mds->mds_lov_objids);
+ if (mli->md_lov_desc.ld_tgt_count > mli->md_lov_objids_in_file) {
+ int size = sizeof(obd_id) * mli->md_lov_desc.ld_tgt_count;
+ rc = obd_get_info(mli->md_lov_exp, strlen("last_id"),
+ "last_id", &size, mli->md_lov_objids);
if (!rc) {
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+ for (i = 0; i < mli->md_lov_desc.ld_tgt_count; i++)
CWARN("got last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[i], i);
- mds->mds_lov_objids_dirty = 1;
- rc = mds_lov_write_objids(obd);
+ mli->md_lov_objids[i], i);
+ mli->md_lov_objids_dirty = 1;
+ rc = mli->md_lov_ops->ml_write_objids(obd, mli, ctxt);
if (rc)
CERROR("got last objids from OSTs, but error "
"writing objids file: %d\n", rc);
}
}
+ return rc;
+}
+
+/* update the LOV-OSC knowledge of the last used object id's */
+int mds_lov_connect(struct obd_device *obd, char * lov_name)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct md_lov_info *mli = &mds->mds_lov_info;
+ int rc;
+ ENTRY;
+
+ rc = md_lov_connect(obd, mli, lov_name, &obd->obd_uuid, &mli_ops,
+ NULL);
+ if (rc)
+ GOTO(err_reg, rc);
+ /* tgt_count may be 0! */
+ rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
+ if (rc) {
+ CERROR("failed to initialize catalog %d\n", rc);
+ GOTO(err_reg, rc);
+ }
+
+ /* If we're mounting this code for the first time on an existing FS,
+ * we need to populate the objids array from the real OST values */
+ rc = md_lov_update_objids(obd, mli, NULL);
+
/* I want to see a callback happen when the OBD moves to a
* "For General Use" state, and that's when we'll call
* set_nextid(). The class driver can help us here, because
err_reg:
obd_register_observer(mds->mds_osc_obd, NULL);
-err_discon:
- obd_disconnect(mds->mds_osc_exp);
- mds->mds_osc_exp = NULL;
- mds->mds_osc_obd = ERR_PTR(rc);
+ if (mli->md_lov_exp) {
+ obd_disconnect(mli->md_lov_exp);
+ mli->md_lov_exp = NULL;
+ }
+ mli->md_lov_obd = ERR_PTR(rc);
RETURN(rc);
}
}
struct mds_lov_sync_info {
- struct obd_device *mlsi_obd; /* the lov device to sync */
- struct obd_device *mlsi_watched; /* target osc */
- __u32 mlsi_index; /* index of target */
+ struct obd_device *mlsi_obd; /* the lov device to sync */
+ struct md_lov_info *mlsi_mli;
+ struct obd_device *mlsi_watched; /* target osc */
+ __u32 mlsi_index; /* index of target */
+ const void *mlsi_ctxt;
};
struct mds_lov_sync_info *mlsi = data;
struct obd_device *obd = mlsi->mlsi_obd;
struct obd_device *watched = mlsi->mlsi_watched;
- struct mds_obd *mds = &obd->u.mds;
+ struct md_lov_info *mli = mlsi->mlsi_mli;
+ const void *ctxt = mlsi->mlsi_ctxt;
struct obd_uuid *uuid;
__u32 idx = mlsi->mlsi_index;
int rc = 0;
uuid = &watched->u.cli.cl_target_uuid;
LASSERT(uuid);
- rc = mds_lov_update_mds(obd, watched, idx);
+ rc = mds_lov_update_mds(obd, mli, watched, idx, ctxt);
if (rc != 0)
GOTO(out, rc);
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
+ rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_MDS_CONN),
KEY_MDS_CONN, 0, uuid, NULL);
if (rc != 0)
GOTO(out, rc);
-
+#if 0
+ /*disable for not support llog in mdd*/
rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
mds->mds_lov_desc.ld_tgt_count,
NULL, NULL, uuid);
obd->obd_name, rc);
GOTO(out, rc);
}
-
+#endif
LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
obd->obd_name, obd_uuid2str(uuid));
if (obd->obd_stopping)
GOTO(out, rc = -ENODEV);
- rc = mds_lov_clear_orphans(mds, uuid);
+ rc = md_lov_clear_orphans(mli, uuid);
if (rc != 0) {
- CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
+ CERROR("%s: failed at md_lov_clear_orphans: %d\n",
obd->obd_name, rc);
GOTO(out, rc);
}
RETURN(__mds_lov_synchronize(data));
}
-int mds_lov_start_synchronize(struct obd_device *obd,
- struct obd_device *watched,
- void *data, int nonblock)
+int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli,
+ struct obd_device *watched,
+ void *data, int nonblock, const void *ctxt)
{
struct mds_lov_sync_info *mlsi;
int rc;
mlsi->mlsi_obd = obd;
mlsi->mlsi_watched = watched;
+ mlsi->mlsi_mli = mli;
+ mlsi->mlsi_ctxt = ctxt;
if (data)
mlsi->mlsi_index = *(__u32 *)data;
else
disconnect the LOV. This of course means a cleanup won't
finish for as long as the sync is blocking. */
class_incref(obd);
-
+#if 0
if (nonblock) {
/* Synchronize in the background */
rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
} else {
rc = __mds_lov_synchronize((void *)mlsi);
}
-
+#else
+ rc = __mds_lov_synchronize((void *)mlsi);
+#endif
RETURN(rc);
}
+EXPORT_SYMBOL(md_lov_start_synchronize);
-int mds_notify(struct obd_device *obd, struct obd_device *watched,
- enum obd_notify_event ev, void *data)
+int mds_lov_start_synchronize(struct obd_device *obd,
+ struct obd_device *watched,
+ void *data, int nonblock)
+{
+ return md_lov_start_synchronize(obd, &obd->u.mds.mds_lov_info,
+ watched, data, nonblock, NULL);
+}
+
+int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli,
+ struct obd_device *watched, enum obd_notify_event ev,
+ void *data)
{
int rc = 0;
- ENTRY;
switch (ev) {
/* We only handle these: */
case OBD_NOTIFY_SYNC_NONBLOCK:
break;
default:
- RETURN(0);
+ RETURN(-ENOENT);
}
CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
/* We still have to fix the lov descriptor for ost's added
after the mdt in the config log. They didn't make it into
mds_lov_connect. */
- rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
+ rc = md_lov_info_update_desc(mli, mli->md_lov_exp);
+ RETURN(-EBUSY);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(md_lov_notity_pre);
+
+int mds_notify(struct obd_device *obd, struct obd_device *watched,
+ enum obd_notify_event ev, void *data)
+{
+ int rc = 0;
+ struct md_lov_info *mli = &obd->u.mds.mds_lov_info;
+ ENTRY;
+
+ rc = md_lov_notity_pre(obd, mli, watched, ev, data);
+ if (rc) {
+ if (rc == -ENOENT || rc == -EBUSY)
+ rc = 0;
RETURN(rc);
}
LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
rc = mds_lov_start_synchronize(obd, watched, data,
!(ev == OBD_NOTIFY_SYNC));
-
+
lquota_recovery(quota_interface, obd);
-
+
RETURN(rc);
}
"wrote trans #"LPU64" rc %d client %s at idx %u: err = %d",
transno, rc, mcd->mcd_uuid, med->med_lr_idx, err);
- err = mds_lov_write_objids(obd);
+ err = mds_lov_write_objids(obd, &mds->mds_lov_info, NULL);
if (err) {
log_pri = D_ERROR;
if (rc == 0)