}
static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
- int active)
+ int active, void *data)
{
int rc;
struct obd_uuid *uuid;
if (obd->obd_observer)
/* Pass the notification up the chain. */
- rc = obd_notify(obd->obd_observer, watched, active);
+ rc = obd_notify(obd->obd_observer, watched, active, data);
RETURN(rc);
}
* say caller that everything is okay. Real connection will be performed
* later. */
static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid)
+ struct obd_uuid *cluuid, unsigned long connect_flags)
{
struct lmv_obd *lmv = &obd->u.lmv;
struct obd_export *exp;
}
lmv->cluuid = *cluuid;
+ lmv->connect_flags = connect_flags;
lmv->connected = 0;
lmv->exp = exp;
+ sema_init(&lmv->init_sem, 1);
RETURN(0);
}
}
/* Performs a check if passed obd is connected. If no - connect it. */
-int lmv_check_connect(struct obd_device *obd) {
+int lmv_check_connect(struct obd_device *obd)
+{
struct lmv_obd *lmv = &obd->u.lmv;
struct obd_uuid *cluuid;
struct lmv_tgt_desc *tgts;
if (lmv->connected)
return 0;
-
+
+ down(&lmv->init_sem);
+ if (lmv->connected) {
+ up(&lmv->init_sem);
+ return 0;
+ }
+
lmv->connected = 1;
cluuid = &lmv->cluuid;
exp = lmv->exp;
GOTO(out_disc, rc = -EINVAL);
}
- rc = obd_connect(&conn, tgt_obd, &lmv_osc_uuid);
+ rc = obd_connect(&conn, tgt_obd, &lmv_osc_uuid, lmv->connect_flags);
if (rc) {
CERROR("Target %s connect error %d\n",
tgts->uuid.uuid, rc);
}
lmv_set_timeouts(obd);
-
class_export_put(exp);
+ up(&lmv->init_sem);
return 0;
out_disc:
"error %d\n", uuid.uuid, i, rc2);
}
class_disconnect(exp, 0);
+ up(&lmv->init_sem);
RETURN (rc);
}
if (!lmv->connected)
class_export_put(exp);
rc = class_disconnect(exp, 0);
+ if (lmv->refcount == 0)
+ lmv->connected = 0;
RETURN(rc);
}
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
int err;
- err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp,
- len, karg, uarg);
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obddev->obd_name, i);
+ continue;
+ }
+
+ err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
if (err) {
if (lmv->tgts[i].active) {
CERROR("error: iocontrol MDC %s on MDT"
struct lmv_desc *desc;
struct obd_uuid *uuids;
struct lmv_tgt_desc *tgts;
+ struct obd_device *tgt_obd;
struct lustre_cfg *lcfg = buf;
struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
OBD_FREE(lmv->tgts, lmv->bufsize);
}
+ tgt_obd = class_find_client_obd(&lmv->tgts->uuid, LUSTRE_MDC_NAME,
+ &obd->obd_uuid);
+ if (!tgt_obd) {
+ CERROR("Target %s not attached\n", lmv->tgts->uuid.uuid);
+ RETURN(-EINVAL);
+ }
+
+ rc = obd_llog_init(obd, &obd->obd_llogs, tgt_obd, 0, NULL);
+ if (rc) {
+ CERROR("failed to setup llogging subsystems\n");
+ }
+
RETURN(rc);
}
RETURN(rc);
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+ continue;
+ }
+
rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age);
if (rc) {
CERROR("can't stat MDS #%d (%s)\n", i,
RETURN(rc);
LASSERT(i < lmv->desc.ld_tgt_count);
-
+
rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid,
ea_size, request);
if (rc)
(unsigned long)fid->mds, (unsigned long)fid->id,
(unsigned long)fid->generation, obj ? "(splitted)" : "");
+ /* if object is splitted, then we loop over all the slaves and gather
+ * size attribute. In ideal world we would have to gather also mds field
+ * from all slaves, as object is spread over the cluster and this is
+ * definitely interesting information and it is not good to loss it,
+ * but...*/
if (obj) {
- /* we have to loop over dirobjs here and gather attrs for all
- * the slaves. */
-#warning "attrs gathering here"
+ struct mds_body *body;
+
+ if (*request == NULL) {
+ lmv_put_obj(obj);
+ RETURN(rc);
+ }
+
+ body = lustre_msg_buf((*request)->rq_repmsg, 0,
+ sizeof(*body));
+ LASSERT(body != NULL);
+
+ lmv_lock_obj(obj);
+
+ for (i = 0; i < obj->objcount; i++) {
+
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n",
+ obd->obd_name, i);
+ continue;
+ }
+
+ /* skip master obj. */
+ if (fid_equal(&obj->fid, &obj->objs[i].fid))
+ continue;
+
+ body->size += obj->objs[i].size;
+ }
+
+ lmv_unlock_obj(obj);
lmv_put_obj(obj);
}
obj = lmv_grab_obj(obd, pfid);
if (obj) {
/* directory is splitted. look for right mds for this name. */
- mds = raw_name2idx(obj->objcount, name, len);
+ mds = raw_name2idx(obj->hashtype, obj->objcount, name, len);
+ mds = obj->objs[mds].fid.mds;
lmv_put_obj(obj);
}
rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cfid, it, data);
if (IS_ERR(obj))
rc = PTR_ERR(obj);
+ lmv_put_obj(obj);
obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
cleanup:
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mds_body *mds_body;
+ struct mds_body *body;
struct lmv_obj *obj;
- int rc, mds;
+ int rc, mds, loop = 0;
ENTRY;
rc = lmv_check_connect(obd);
if (!lmv->desc.ld_active_tgt_count)
RETURN(-EIO);
repeat:
+ LASSERT(++loop <= 2);
obj = lmv_grab_obj(obd, &op_data->fid1);
if (obj) {
- mds = raw_name2idx(obj->objcount, op_data->name,
+ mds = raw_name2idx(obj->hashtype, obj->objcount, op_data->name,
op_data->namelen);
op_data->fid1 = obj->objs[mds].fid;
lmv_put_obj(obj);
}
- CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
- op_data->namelen, op_data->name,
- (unsigned long)op_data->fid1.mds,
+ CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n", op_data->namelen,
+ op_data->name, (unsigned long)op_data->fid1.mds,
(unsigned long)op_data->fid1.id,
(unsigned long)op_data->fid1.generation);
rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data,
datalen, mode, uid, gid, rdev, request);
if (rc == 0) {
-
if (*request == NULL)
RETURN(rc);
- mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
- sizeof(*mds_body));
- LASSERT(mds_body != NULL);
+ body = lustre_msg_buf((*request)->rq_repmsg, 0,
+ sizeof(*body));
+ LASSERT(body != NULL);
- CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
- (unsigned long)mds_body->fid1.id,
- (unsigned long)mds_body->fid1.generation,
- op_data->fid1.mds);
+ CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, "
+ "mds = %d\n", (unsigned long)body->fid1.id,
+ (unsigned long)body->fid1.generation, op_data->fid1.mds);
- LASSERT(mds_body->valid & OBD_MD_MDS ||
- mds_body->mds == op_data->fid1.mds);
+ LASSERT(body->valid & OBD_MD_MDS ||
+ body->mds == op_data->fid1.mds);
} else if (rc == -ERESTART) {
/* directory got splitted. time to update local object and
* repeat the request with proper MDS */
LASSERT(mea != NULL);
for (i = 0; i < mea->mea_count; i++) {
- if (lmv->tgts[i].ltd_exp == NULL)
- continue;
-
memset(&data2, 0, sizeof(data2));
data2.fid1 = mea->mea_fids[i];
mds = data2.fid1.mds;
+ if (lmv->tgts[mds].ltd_exp == NULL)
+ continue;
+
rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
&data2, lockh + i, lmm, lmmsize, cb_completion,
cb_blocking, cb_data);
if (obj) {
/* directory is splitted. look for right mds for this
* name */
- mds = raw_name2idx(obj->objcount, (char *)data->name,
- data->namelen);
+ mds = raw_name2idx(obj->hashtype, obj->objcount,
+ (char *)data->name, data->namelen);
data->fid1 = obj->objs[mds].fid;
lmv_put_obj(obj);
}
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct ll_fid rfid = *fid;
- int rc, mds = fid->mds;
+ int rc, mds = fid->mds, loop = 0;
struct mds_body *body;
struct lmv_obj *obj;
ENTRY;
if (rc)
RETURN(rc);
repeat:
+ LASSERT(++loop <= 2);
obj = lmv_grab_obj(obd, fid);
if (obj) {
/* directory is splitted. look for right mds for this name */
- mds = raw_name2idx(obj->objcount, filename, namelen - 1);
+ mds = raw_name2idx(obj->hashtype, obj->objcount, filename, namelen - 1);
rfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
(unsigned long)rfid.mds, (unsigned long)rfid.id,
(unsigned long)rfid.generation);
- rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename,
+ rc = md_getattr_name(lmv->tgts[rfid.mds].ltd_exp, &rfid, filename,
namelen, valid, ea_size, request);
if (rc == 0) {
/* this could be cross-node reference. in this case all we have
/* usual link request */
obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
- rc = raw_name2idx(obj->objcount, data->name,
+ rc = raw_name2idx(obj->hashtype, obj->objcount, data->name,
data->namelen);
data->fid1 = obj->objs[rc].fid;
lmv_put_obj(obj);
if (obj) {
/* directory is already splitted, so we have to forward request
* to the right MDS */
- mds = raw_name2idx(obj->objcount, (char *)old, oldlen);
+ mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)old, oldlen);
data->fid1 = obj->objs[mds].fid;
CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
(unsigned long)obj->objs[mds].fid.mds,
if (obj) {
/* directory is already splitted, so we have to forward request
* to the right MDS */
- mds = raw_name2idx(obj->objcount, (char *)new, newlen);
+ mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)new, newlen);
data->fid2 = obj->objs[mds].fid;
CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
(unsigned long)obj->objs[mds].fid.mds,
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- int rc = 0, i = data->fid1.mds;
struct ptlrpc_request *req;
- struct mds_body *mds_body;
+ struct mds_body *body;
struct lmv_obj *obj;
+ int rc = 0, i;
ENTRY;
rc = lmv_check_connect(obd);
for (i = 0; i < obj->objcount; i++) {
data->fid1 = obj->objs[i].fid;
- rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr,
- ea, ealen, ea2, ea2len, &req);
- if (rc) {
- lmv_put_obj(obj);
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
+ rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data,
+ iattr, ea, ealen, ea2, ea2len, &req);
if (fid_equal(&obj->fid, &obj->objs[i].fid)) {
/* this is master object and this request should
} else {
ptlrpc_req_finished(req);
}
+
+ if (rc)
+ break;
}
lmv_put_obj(obj);
} else {
- LASSERT(i < lmv->desc.ld_tgt_count);
- rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen,
- ea2, ea2len, request);
+ LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count);
+ rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data,
+ iattr, ea, ealen, ea2, ea2len, request);
if (rc == 0) {
- mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
- sizeof(*mds_body));
- LASSERT(mds_body != NULL);
- LASSERT(mds_body->mds == i);
+ body = lustre_msg_buf((*request)->rq_repmsg, 0,
+ sizeof(*body));
+ LASSERT(body != NULL);
+ LASSERT(body->mds == data->fid1.mds);
}
}
RETURN(rc);
if (rc)
RETURN(rc);
- rc = md_sync(lmv->tgts[0].ltd_exp, fid, request);
+ rc = md_sync(lmv->tgts[fid->mds].ltd_exp, fid, request);
RETURN(rc);
}
LASSERT(mea != NULL);
for (i = 0; i < mea->mea_count; i++) {
- if (lmv->tgts[i].ltd_exp == NULL)
- continue;
-
memset(&data2, 0, sizeof(data2));
data2.fid1 = mea->mea_fids[i];
data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
mds = data2.fid1.mds;
+
+ if (lmv->tgts[mds].ltd_exp == NULL)
+ continue;
+
rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n",
(unsigned long) mea->mea_fids[i].mds,
RETURN(rc);
}
+int lmv_delete_object(struct obd_export *exp, struct ll_fid *fid)
+{
+ ENTRY;
+
+ if (!lmv_delete_obj(exp, fid)) {
+ CDEBUG(D_OTHER, "Object %lu/%lu/%lu is not found.\n",
+ (unsigned long)fid->mds, (unsigned long)fid->id,
+ (unsigned long)fid->generation);
+ }
+
+ RETURN(0);
+}
+
int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
struct ptlrpc_request **request)
{
obj = lmv_grab_obj(obd, &data->fid1);
if (obj) {
- i = raw_name2idx(obj->objcount, data->name,
+ i = raw_name2idx(obj->hashtype, obj->objcount, data->name,
data->namelen);
data->fid1 = obj->objs[i].fid;
lmv_put_obj(obj);
rc = lmv_check_connect(obd);
if (rc)
RETURN(ERR_PTR(rc));
+#warning "we need well-desgined readdir() implementation to remove this mess"
obd = lmv->tgts[0].ltd_exp->exp_obd;
EXIT;
return obd;
RETURN(0);
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+ continue;
+ }
+
rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
if (rc) {
CERROR("obd_init_ea_size() failed on MDT target %d, "
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mea *mea;
int i, c, rc = 0;
+ struct mea *mea;
struct ll_fid mfid;
+ int lcount;
ENTRY;
rc = lmv_check_connect(obd);
mea = (struct mea *)*ea;
if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
mea->mea_count = lmv->desc.ld_tgt_count;
+ mea->mea_magic = MEA_MAGIC_ALL_CHARS;
mea->mea_master = -1;
-
- for (i = 0, c = 0; c < mea->mea_count &&
- i < lmv->desc.ld_tgt_count; i++) {
+ lcount = lmv->desc.ld_tgt_count;
+ for (i = 0, c = 0; c < mea->mea_count && i < lcount; i++) {
struct lov_stripe_md obj_md;
struct lov_stripe_md *obj_mdp = &obj_md;
RETURN(rc);
}
+static int lmv_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid)
+{
+ struct llog_ctxt *ctxt;
+ int rc;
+ ENTRY;
+
+ rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ &llog_client_ops);
+ if (rc == 0) {
+ ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
+ ctxt->loc_imp = tgt->u.cli.cl_import;
+ }
+
+ RETURN(rc);
+}
+
+static int lmv_llog_finish(struct obd_device *obd,
+ struct obd_llogs *llogs, int count)
+{
+ int rc;
+ ENTRY;
+
+ rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
+ RETURN(rc);
+}
+
static int lmv_get_info(struct obd_export *exp, __u32 keylen,
void *key, __u32 *vallen, void *val)
{
}
lmv = &obd->u.lmv;
- if (keylen >= strlen("client") && strcmp(key, "client") == 0) {
- struct lmv_tgt_desc *tgts;
- int i, rc;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- for (i = 0, tgts = lmv->tgts;
- i < lmv->desc.ld_tgt_count; i++, tgts++) {
- rc = obd_set_info(tgts->ltd_exp, keylen, key, vallen, val);
- if (rc)
- RETURN(rc);
- }
- RETURN(0);
- } else if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
+ if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
lmv->server_timeout = 1;
lmv_set_timeouts(obd);
RETURN(0);
.o_connect = lmv_connect,
.o_disconnect = lmv_disconnect,
.o_statfs = lmv_statfs,
+ .o_llog_init = lmv_llog_init,
+ .o_llog_finish = lmv_llog_finish,
.o_get_info = lmv_get_info,
.o_set_info = lmv_set_info,
.o_create = lmv_obd_create,
.m_unlink = lmv_unlink,
.m_get_real_obd = lmv_get_real_obd,
.m_valid_attrs = lmv_valid_attrs,
+ .m_delete_object = lmv_delete_object,
};
int __init lmv_init(void)