1)add lov_create prototype for testing mdd lov, Note: it will finally removed to share
the same api as MDS.
2)reconstruct MDS lov code to export some api, mdd could use. not finished.
#define IOC_REQUEST_CLOSE _IOWR('f', 35, long)
#define IOC_REQUEST_MAX_NR 35
+#define MDS_LOV_MD_NAME "lov"
#endif
int (*mdo_create)(const struct lu_context *, struct md_object *,
const char *child_name, struct md_object *,
const char *target_name, struct lu_attr *);
-
int (*mdo_rename)(const struct lu_context *ctxt,
struct md_object *spobj, struct md_object *tpobj,
const struct lu_fid *lf, const char *sname,
struct semaphore mgs_sem;
};
+struct md_lov_info {
+ struct obd_device *md_osc_obd; /* XXX lov_obd */
+ struct obd_uuid md_lov_uuid;
+ struct obd_export *md_osc_exp; /* XXX lov_exp */
+ struct lov_desc md_lov_desc;
+ obd_id *md_lov_objids;
+ int md_lov_objids_size;
+ __u32 md_lov_objids_in_file;
+ unsigned int md_lov_objids_dirty:1;
+ int md_lov_nextid_set;
+ struct file *md_lov_objid_filp;
+ unsigned long md_lov_objids_valid:1;
+};
+
+#define mds_osc_obd mds_lov_info.md_osc_obd
+#define mds_lov_uuid mds_lov_info.md_lov_uuid
+#define mds_osc_exp mds_lov_info.md_osc_exp
+#define mds_lov_desc mds_lov_info.md_lov_desc
+#define mds_lov_objids mds_lov_info.md_lov_objids
+#define mds_lov_objids_size mds_lov_info.md_lov_objids_size
+#define mds_lov_objids_in_file mds_lov_info.md_lov_objids_in_file
+#define mds_lov_objids_dirty mds_lov_info.md_lov_objids_dirty
+#define mds_lov_nextid_set mds_lov_info.md_lov_nextid_set
+#define mds_lov_objid_filp mds_lov_info.md_lov_objid_filp
+#define mds_lov_objids_valid mds_lov_info.md_lov_objids_valid
+
struct mds_obd {
/* NB this field MUST be first */
struct obd_device_target mds_obt;
cfs_dentry_t *mds_objects_dir;
struct llog_handle *mds_cfg_llh;
// struct llog_handle *mds_catalog;
- struct obd_device *mds_osc_obd; /* XXX lov_obd */
- struct obd_uuid mds_lov_uuid;
+ struct md_lov_info mds_lov_info;
char *mds_profile;
- struct obd_export *mds_osc_exp; /* XXX lov_exp */
- struct lov_desc mds_lov_desc;
- obd_id *mds_lov_objids;
- int mds_lov_objids_size;
- __u32 mds_lov_objids_in_file;
- unsigned int mds_lov_objids_dirty:1;
- int mds_lov_nextid_set;
- struct file *mds_lov_objid_filp;
struct file *mds_health_check_filp;
unsigned long *mds_client_bitmap;
struct semaphore mds_orphan_recovery_sem;
struct lustre_quota_info mds_quota_info;
struct semaphore mds_qonoff_sem;
struct semaphore mds_health_sem;
- unsigned long mds_lov_objids_valid:1,
- mds_fl_user_xattr:1,
+ unsigned long mds_fl_user_xattr:1,
mds_fl_acl:1;
};
if (rc)
GOTO(cleanup, rc);
+ rc = mdd_lov_create(ctxt, mdd, son);
+ if (rc)
+ GOTO(cleanup, rc);
created = 1;
rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
name, handle);
const void *buf, int buf_len, const char *name);
int mdd_lov_set_md(const struct lu_context *ctxt, struct md_object *pobj,
struct md_object *child);
+int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
+ struct mdd_object *child);
struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx);
extern struct lu_device_operations mdd_lu_ops;
static inline int lu_device_is_mdd(struct lu_device *d)
#include <lu_object.h>
#include <md_object.h>
#include <dt_object.h>
+#include <lustre_mds.h>
#include "mdd_internal.h"
RETURN(rc);
}
+static int mdd_lov_clear_orphans(struct mdd_lov_info *mli,
+ struct obd_uuid *ost_uuid)
+{
+ int rc;
+ struct obdo oa;
+ struct obd_trans_info oti = {0};
+ struct lov_stripe_md *empty_ea = NULL;
+ ENTRY;
+
+ LASSERT(mli->mdd_lov_objids != NULL);
+
+ /* This create will in fact either create or destroy: If the OST is
+ * missing objects below this ID, they will be created. If it finds
+ * objects above this ID, they will be removed. */
+ memset(&oa, 0, sizeof(oa));
+ oa.o_valid = OBD_MD_FLFLAGS;
+ oa.o_flags = OBD_FL_DELORPHAN;
+ if (ost_uuid != NULL) {
+ memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
+ oa.o_valid |= OBD_MD_FLINLINE;
+ }
+ rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa,
+ &empty_ea, &oti);
+
+ RETURN(rc);
+}
+
/* We only sync one osc at a time, so that we don't have to hold
any kind of lock on the whole mds_lov_desc, which may change
(grow) as a result of mds_lov_add_ost. This also avoids any
rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
NULL);
- if (rc != 0)
+ if (rc != 0) {
+ CERROR("failed at obd_set_info_async: %d\n", rc);
GOTO(out, rc);
+ }
+
+ rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid);
+ if (rc != 0) {
+ CERROR("failed at mds_lov_clear_orphans: %d\n", rc);
+ GOTO(out, rc);
+ }
out:
+ EXIT;
lu_device_put(ld);
- RETURN(rc);
+ return rc;
}
int mdd_lov_synchronize(void *data)
next = mdd_object_child(md2mdd_obj(obj));
rc = next->do_ops->do_xattr_get(ctxt, next, md, *md_size,
- "lov");
+ MDS_LOV_MD_NAME);
if (rc < 0) {
CERROR("Error %d reading eadata \n", rc);
} else if (rc > 0) {
int lmm_size = sizeof(lmm);
rc = mdd_get_md(ctxt, pobj, &lmm, &lmm_size, 1);
if (rc > 0) {
- rc = mdd_xattr_set(ctxt, child, lmm, lmm_size, "lov");
+ rc = mdd_xattr_set(ctxt, child, lmm, lmm_size, MDS_LOV_MD_NAME);
if (rc)
CERROR("error on copy stripe info: rc = %d\n", rc);
}
}
RETURN(rc);
}
+
+int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
+ struct mdd_object *child)
+{
+ struct mdd_lov_info *mli = &mdd->mdd_lov_info;
+ struct obdo *oa;
+ struct lov_mds_md *lmm = NULL;
+ struct lov_stripe_md *lsm = NULL;
+ int rc = 0, lmm_size;
+ ENTRY;
+
+ oa = obdo_alloc();
+
+ oa->o_uid = 0; /* must have 0 uid / gid on OST */
+ oa->o_gid = 0;
+ oa->o_mode = S_IFREG | 0600;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
+ OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+ oa->o_size = 0;
+
+ rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL);
+ if (rc)
+ GOTO(out_oa, rc);
+
+ rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm);
+ if (rc < 0) {
+ CERROR("cannot pack lsm, err = %d\n", rc);
+ GOTO(out_oa, rc);
+ }
+ lmm_size = rc;
+
+ rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size,
+ MDS_LOV_MD_NAME);
+out_oa:
+ obdo_free(oa);
+ RETURN(rc);
+}
#include "mds_internal.h"
-void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
+void md_lov_info_update_objids(struct md_lov_info *mli, obd_id *ids)
{
- struct mds_obd *mds = &obd->u.mds;
int i;
- ENTRY;
-
lock_kernel();
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
- if (ids[i] > (mds->mds_lov_objids)[i]) {
- (mds->mds_lov_objids)[i] = ids[i];
- mds->mds_lov_objids_dirty = 1;
+ for (i = 0; i < mli->md_lov_desc.ld_tgt_count; i++)
+ if (ids[i] > (mli->md_lov_objids)[i]) {
+ (mli->md_lov_objids)[i] = ids[i];
+ mli->md_lov_objids_dirty = 1;
}
unlock_kernel();
- EXIT;
+}
+EXPORT_SYMBOL(md_lov_info_update_objids);
+
+void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
+{
+ struct mds_obd *mds = &obd->u.mds;
+
+ md_lov_info_update_objids(&mds->mds_lov_info, ids);
}
static int mds_lov_read_objids(struct obd_device *obd)
RETURN(rc);
}
-int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+int md_lov_info_clear_orphans(struct md_lov_info *mli,
+ struct obd_uuid *ost_uuid)
{
int rc;
struct obdo oa;
struct lov_stripe_md *empty_ea = NULL;
ENTRY;
- LASSERT(mds->mds_lov_objids != NULL);
+ LASSERT(mli->md_lov_objids != NULL);
/* This create will in fact either create or destroy: If the OST is
* missing objects below this ID, they will be created. If it finds
memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
oa.o_valid |= OBD_MD_FLINLINE;
}
- rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
+ rc = obd_create(mli->md_osc_exp, &oa, &empty_ea, &oti);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(md_lov_info_clear_orphans);
+
+int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+{
+ return md_lov_info_clear_orphans(&mds->mds_lov_info, ost_uuid);
+}
+
+int md_lov_info_set_nextid(struct md_lov_info *mli)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(mli->md_lov_objids != NULL);
+
+ rc = obd_set_info_async(mli->md_osc_exp, strlen(KEY_NEXT_ID),
+ KEY_NEXT_ID,
+ mli->md_lov_desc.ld_tgt_count,
+ mli->md_lov_objids, NULL);
+
RETURN(rc);
+
}
+EXPORT_SYMBOL(md_lov_info_set_nextid);
/* update the LOV-OSC knowledge of the last used object id's */
int mds_lov_set_nextid(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
int rc;
- ENTRY;
LASSERT(!obd->obd_recovering);
-
- LASSERT(mds->mds_lov_objids != NULL);
-
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
- KEY_NEXT_ID,
- mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids, NULL);
+ rc = md_lov_info_set_nextid(&mds->mds_lov_info);
if (rc)
CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
obd->obd_name, rc);
-
RETURN(rc);
}
-/* Update the lov desc for a new size lov. */
-static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
+int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov)
{
- struct mds_obd *mds = &obd->u.mds;
struct lov_desc *ld;
- __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
+ __u32 size, valsize = sizeof(mli->md_lov_desc);
int rc = 0;
ENTRY;
/* The size of the LOV target table may have increased. */
size = ld->ld_tgt_count * sizeof(obd_id);
- if ((mds->mds_lov_objids_size == 0) ||
- (size > mds->mds_lov_objids_size)) {
+ if ((mli->md_lov_objids_size == 0) ||
+ (size > mli->md_lov_objids_size)) {
obd_id *ids;
/* add room by powers of 2 */
if (ids == NULL)
GOTO(out, rc = -ENOMEM);
memset(ids, 0, size);
- if (mds->mds_lov_objids_size) {
- obd_id *old_ids = mds->mds_lov_objids;
- memcpy(ids, mds->mds_lov_objids,
- mds->mds_lov_objids_size);
- mds->mds_lov_objids = ids;
- OBD_FREE(old_ids, mds->mds_lov_objids_size);
+ if (mli->md_lov_objids_size) {
+ obd_id *old_ids = mli->md_lov_objids;
+ memcpy(ids, mli->md_lov_objids,
+ mli->md_lov_objids_size);
+ mli->md_lov_objids = ids;
+ OBD_FREE(old_ids, mli->md_lov_objids_size);
}
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_size = size;
+ mli->md_lov_objids = ids;
+ mli->md_lov_objids_size = size;
}
/* Don't change the mds_lov_desc until the objids size matches the
count (paranoia) */
- mds->mds_lov_desc = *ld;
+ mli->md_lov_desc = *ld;
CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
- mds->mds_lov_desc.ld_tgt_count);
+ mli->md_lov_desc.ld_tgt_count);
+out:
+ OBD_FREE(ld, sizeof(*ld));
+ RETURN(rc);
+}
+/* Update the lov desc for a new size lov. */
+static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ __u32 stripes;
+ int rc = 0;
+ ENTRY;
+
+ rc = md_lov_info_update_desc(&mds->mds_lov_info, lov);
+ if (rc)
+ GOTO(out, rc);
+
stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
max(mds->mds_lov_desc.ld_tgt_count,
mds->mds_lov_objids_in_file));
mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
mds->mds_max_mdsize, mds->mds_max_cookiesize);
-
out:
- OBD_FREE(ld, sizeof(*ld));
RETURN(rc);
}
__u32 mlsi_index; /* index of target */
};
+
/* We only sync one osc at a time, so that we don't have to hold
any kind of lock on the whole mds_lov_desc, which may change
(grow) as a result of mds_lov_add_ost. This also avoids any