#include <linux/seq_file.h>
#include <linux/lprocfs_status.h>
+#include "lov_internal.h"
+
+static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
+ int stripeno, obd_off *obd_off);
+
struct lov_file_handles {
struct portals_handle lfh_handle;
atomic_t lfh_refcount;
struct lov_file_handles *lfh = lfhp;
atomic_inc(&lfh->lfh_refcount);
- CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
+ CDEBUG(D_MALLOC, "GETting lfh %p : new refcount %d\n", lfh,
atomic_read(&lfh->lfh_refcount));
}
static void lov_lfh_put(struct lov_file_handles *lfh)
{
- CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
+ CDEBUG(D_MALLOC, "PUTting lfh %p : new refcount %d\n", lfh,
atomic_read(&lfh->lfh_refcount) - 1);
LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
atomic_read(&lfh->lfh_refcount) < 0x5a5a);
struct proc_dir_entry *entry;
int rc;
- lprocfs_init_vars(&lvars);
+ lprocfs_init_vars(lov, &lvars);
rc = lprocfs_obd_attach(dev, lvars.obd_vars);
- if (rc)
+ if (rc)
return rc;
entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
- if (entry == NULL)
+ if (entry == NULL)
RETURN(-ENOMEM);
- entry->proc_fops = &ll_proc_target_fops;
+ entry->proc_fops = &lov_proc_target_fops;
entry->data = dev;
-
+
return rc;
-
}
int lov_detach(struct obd_device *dev)
if (rc)
RETURN(rc);
+ exp = class_conn2export(conn);
+ spin_lock_init(&exp->exp_lov_data.led_lock);
+ INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+
/* We don't want to actually do the underlying connections more than
* once, so keep track. */
lov->refcount++;
- if (lov->refcount > 1)
+ if (lov->refcount > 1) {
+ class_export_put(exp);
RETURN(0);
-
- exp = class_conn2export(conn);
- spin_lock_init(&exp->exp_lov_data.led_lock);
- INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+ }
/* retrieve LOV metadata from MDS */
rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
* array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
* terminated), but I still need to verify it makes overall
* sense */
- mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
- LASSERT (mdesc != NULL);
- LASSERT_REPSWABBED (req, 0);
+ mdesc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*mdesc));
+ LASSERT(mdesc != NULL);
+ LASSERT_REPSWABBED(req, 0);
*desc = *mdesc;
* demands on memory here. */
lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
OBD_ALLOC(lov->tgts, lov->bufsize);
- if (!lov->tgts) {
+ if (lov->tgts == NULL) {
CERROR("Out of memory\n");
GOTO(out_req, rc = -ENOMEM);
}
uuids = lustre_msg_buf(req->rq_repmsg, 1,
sizeof(*uuids) * desc->ld_tgt_count);
- LASSERT (uuids != NULL);
- LASSERT_REPSWABBED (req, 1);
+ LASSERT(uuids != NULL);
+ LASSERT_REPSWABBED(req, 1);
for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
struct obd_uuid *uuid = &tgts->uuid;
}
mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
- ptlrpc_req_finished (req);
+ mdc->cl_max_mds_cookiesize = desc->ld_tgt_count *
+ sizeof(struct llog_cookie);
+ ptlrpc_req_finished(req);
class_export_put(exp);
RETURN (0);
RETURN (rc);
}
-static int lov_disconnect(struct lustre_handle *conn, int failover)
+static int lov_disconnect(struct lustre_handle *conn, int flags)
{
struct obd_device *obd = class_conn2obd(conn);
struct lov_obd *lov = &obd->u.lov;
class_conn2obd(&lov->tgts[i].conn);
osc_obd->obd_no_recov = 1;
}
- rc = obd_disconnect(&lov->tgts[i].conn, failover);
+ rc = obd_disconnect(&lov->tgts[i].conn, flags);
if (rc) {
if (lov->tgts[i].active) {
CERROR("Target %s disconnect error %d\n",
lov->bufsize = 0;
lov->tgts = NULL;
+ out_local:
exp = class_conn2export(conn);
if (exp == NULL) {
CERROR("export handle "LPU64" invalid! If you can reproduce, "
spin_unlock(&exp->exp_lov_data.led_lock);
class_export_put(exp);
- out_local:
rc = class_disconnect(conn, 0);
RETURN(rc);
}
static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
struct lov_stripe_md *lsm, int stripeno, int *set)
{
+ valid &= src->o_valid;
+
if (*set) {
if (valid & OBD_MD_FLSIZE) {
/* this handles sparse files properly */
if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
tgt->o_mtime = src->o_mtime;
} else {
- obdo_cpy_md(tgt, src, valid);
+ memcpy(tgt, src, sizeof(*tgt));
+ tgt->o_id = lsm->lsm_object_id;
if (valid & OBD_MD_FLSIZE)
tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
*set = 1;
}
}
+#ifndef log2
+#define log2(n) ffz(~(n))
+#endif
+
/* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct lustre_handle *conn, struct obdo *oa,
+static int lov_create(struct lustre_handle *conn, struct obdo *src_oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_stripe_md *lsm;
- struct lov_oinfo *loi;
- struct obdo *tmp;
+ struct lov_oinfo *loi = NULL;
+ struct obdo *tmp_oa, *ret_oa;
+ struct llog_cookie *cookies = NULL;
unsigned ost_count, ost_idx;
- int set = 0, obj_alloc = 0;
- int rc = 0, i;
+ int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i;
ENTRY;
LASSERT(ea);
if (!export)
- GOTO(out_exp, rc = -EINVAL);
+ RETURN(-EINVAL);
lov = &export->exp_obd->u.lov;
if (!lov->desc.ld_active_tgt_count)
GOTO(out_exp, rc = -EIO);
- tmp = obdo_alloc();
- if (!tmp)
+ ret_oa = obdo_alloc();
+ if (!ret_oa)
GOTO(out_exp, rc = -ENOMEM);
+ tmp_oa = obdo_alloc();
+ if (!tmp_oa)
+ GOTO(out_oa, rc = -ENOMEM);
+
lsm = *ea;
if (!lsm) {
- rc = obd_alloc_memmd(conn, &lsm);
+ int stripes;
+ ost_count = lov_get_stripecnt(lov, 0);
+
+ /* If the MDS file was truncated up to some size, stripe over
+ * enough OSTs to allow the file to be created at that size.
+ */
+ if (src_oa->o_valid & OBD_MD_FLSIZE) {
+ stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
+ do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
+
+ if (stripes > lov->desc.ld_active_tgt_count)
+ GOTO(out_exp, rc = -EFBIG);
+ if (stripes < ost_count)
+ stripes = ost_count;
+ } else
+ stripes = ost_count;
+
+ rc = lov_alloc_memmd(&lsm, stripes);
if (rc < 0)
GOTO(out_tmp, rc);
rc = 0;
- lsm->lsm_magic = LOV_MAGIC;
}
ost_count = lov->desc.ld_tgt_count;
- LASSERT(oa->o_valid & OBD_MD_FLID);
- lsm->lsm_object_id = oa->o_id;
+ LASSERT(src_oa->o_valid & OBD_MD_FLID);
+ lsm->lsm_object_id = src_oa->o_id;
if (!lsm->lsm_stripe_size)
lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
get_random_bytes(&ost_idx, 2);
ost_idx %= ost_count;
- } else
+ } else {
ost_idx = lsm->lsm_stripe_offset;
+ }
CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
+ /* XXX LOV STACKING: need to figure out how many real OSCs */
+ if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
+ oti_alloc_cookies(oti, lsm->lsm_stripe_count);
+ if (!oti->oti_logcookies)
+ GOTO(out_cleanup, rc = -ENOMEM);
+ cookies = oti->oti_logcookies;
+ }
+
loi = lsm->lsm_oinfo;
for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
struct lov_stripe_md obj_md;
}
/* create data objects with "parent" OA */
- memcpy(tmp, oa, sizeof(*tmp));
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+
+ /* XXX When we start creating objects on demand, we need to
+ * make sure that we always create the object on the
+ * stripe which holds the existing file size.
+ */
+ if (src_oa->o_valid & OBD_MD_FLSIZE) {
+ if (lov_stripe_offset(lsm, src_oa->o_size, i,
+ &tmp_oa->o_size) < 0 &&
+ tmp_oa->o_size)
+ tmp_oa->o_size--;
+
+ CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
+ i, tmp_oa->o_size, src_oa->o_size);
+ }
+
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
- err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
+ err = obd_create(&lov->tgts[ost_idx].conn, tmp_oa,&obj_mdp,oti);
if (err) {
if (lov->tgts[ost_idx].active) {
CERROR("error creating objid "LPX64" sub-object"
- " on OST idx %d/%d: rc = %d\n", oa->o_id,
- ost_idx, lsm->lsm_stripe_count, err);
+ " on OST idx %d/%d: rc = %d\n",
+ src_oa->o_id, ost_idx,
+ lsm->lsm_stripe_count, err);
if (err > 0) {
CERROR("obd_create returned invalid "
"err %d\n", err);
rc = err;
continue;
}
- loi->loi_id = tmp->o_id;
+ loi->loi_id = tmp_oa->o_id;
loi->loi_ost_idx = ost_idx;
CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
lsm->lsm_object_id, loi->loi_id, ost_idx);
if (set == 0)
lsm->lsm_stripe_offset = ost_idx;
- lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
- ot_init(&loi->loi_dirty_ot_inline);
+ lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
+ obj_alloc, &set);
loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
+ ot_init(loi->loi_dirty_ot);
+ if (cookies)
+ ++oti->oti_logcookies;
+ if (tmp_oa->o_valid & OBD_MD_FLCOOKIE)
+ ++cookie_sent;
++obj_alloc;
++loi;
GOTO(out_done, rc = 0);
}
+ /* If we were passed specific striping params, then a failure to
+ * meet those requirements is an error, since we can't reallocate
+ * that memory (it might be part of a larger array or something).
+ *
+ * We can only get here if lsm_stripe_count was originally > 1.
+ */
if (*ea != NULL) {
CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
} else {
struct lov_stripe_md *lsm_new;
/* XXX LOV STACKING call into osc for sizes */
- unsigned size = lov_stripe_md_size(obj_alloc);
+ unsigned oldsize, newsize;
+
+ if (oti && cookies && cookie_sent) {
+ oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
+ newsize = obj_alloc * sizeof(*cookies);
+
+ oti_alloc_cookies(oti, obj_alloc);
+ if (oti->oti_logcookies) {
+ memcpy(oti->oti_logcookies, cookies, newsize);
+ OBD_FREE(cookies, oldsize);
+ cookies = oti->oti_logcookies;
+ } else {
+ CWARN("'leaking' %d bytes\n", oldsize-newsize);
+ }
+ }
CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
- OBD_ALLOC(lsm_new, size);
- if (!lsm_new)
- GOTO(out_cleanup, rc = -ENOMEM);
- memcpy(lsm_new, lsm, size);
- lsm_new->lsm_stripe_count = obj_alloc;
-
- /* XXX LOV STACKING call into osc for sizes */
- OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
- lsm = lsm_new;
-
+ oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
+ newsize = lov_stripe_md_size(obj_alloc);
+ OBD_ALLOC(lsm_new, newsize);
+ if (lsm_new != NULL) {
+ memcpy(lsm_new, lsm, newsize);
+ lsm_new->lsm_stripe_count = obj_alloc;
+ OBD_FREE(lsm, newsize);
+ lsm = lsm_new;
+ } else {
+ CWARN("'leaking' %d bytes\n", oldsize - newsize);
+ }
rc = 0;
}
out_done:
*ea = lsm;
+ if (src_oa->o_valid & OBD_MD_FLSIZE &&
+ ret_oa->o_size != src_oa->o_size) {
+ CERROR("original size "LPU64" isn't new object size "LPU64"\n",
+ src_oa->o_size, ret_oa->o_size);
+ LBUG();
+ }
+ ret_oa->o_id = src_oa->o_id;
+ memcpy(src_oa, ret_oa, sizeof(*src_oa));
out_tmp:
- obdo_free(tmp);
+ obdo_free(tmp_oa);
+ out_oa:
+ obdo_free(ret_oa);
+ if (oti && cookies) {
+ oti->oti_logcookies = cookies;
+ if (!cookie_sent) {
+ oti_free_cookies(oti);
+ src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
+ } else {
+ src_oa->o_valid |= OBD_MD_FLCOOKIE;
+ }
+ }
out_exp:
class_export_put(export);
return rc;
--loi;
/* destroy already created objects here */
- memcpy(tmp, oa, sizeof(*tmp));
- tmp->o_id = loi->loi_id;
- err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
- NULL);
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ tmp_oa->o_id = loi->loi_id;
+
+ if (oti && cookie_sent) {
+ err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
+ NULL, 1, --oti->oti_logcookies,
+ OBD_LLOG_FL_SENDNOW);
+ if (err)
+ CERROR("Failed to cancel objid "LPX64" subobj "
+ LPX64" cookie on OST idx %d: rc = %d\n",
+ src_oa->o_id, loi->loi_id,
+ loi->loi_ost_idx, err);
+ }
+
+ err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
+ NULL, oti);
if (err)
- CERROR("Failed to uncreate objid "LPX64" subobj "
- LPX64" on OST idx %d: rc = %d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx,
- err);
+ CERROR("Failed to uncreate objid "LPX64" subobj "LPX64
+ " on OST idx %d: rc = %d\n", src_oa->o_id,
+ loi->loi_id, loi->loi_ost_idx, err);
}
if (*ea == NULL)
obd_free_memmd(conn, &lsm);
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
- FD_OSTDATA_SIZE);
+ memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
- NULL, NULL);
+ NULL, oti);
if (err && lov->tgts[loi->loi_ost_idx].active) {
CERROR("error: destroying objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
- FD_OSTDATA_SIZE);
+ memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
return rc;
}
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
- struct lov_getattr_async_args *aa, int rc)
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
+ int rc)
{
+ struct lov_getattr_async_args *aa = data;
struct lov_stripe_md *lsm = aa->aa_lsm;
struct obdo *oa = aa->aa_oa;
- struct obdo *obdos = aa->aa_stripe_oas;
+ struct obdo *obdos = aa->aa_obdos;
struct lov_oinfo *loi;
int i;
int set = 0;
if (rc == 0) {
/* NB all stripe requests succeeded to get here */
- for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++,loi++) {
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++) {
if (obdos[i].o_valid == 0) /* inactive stripe */
continue;
memcpy(&obdos[i], oa, sizeof(obdos[i]));
obdos[i].o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
- FD_OSTDATA_SIZE);
+ memcpy(obdo_handle(&obdos[i]), &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
else
obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
aa = (struct lov_getattr_async_args *)&rqset->set_args;
aa->aa_lsm = lsm;
aa->aa_oa = oa;
- aa->aa_stripe_oas = obdos;
+ aa->aa_obdos = obdos;
GOTO (out, rc = 0);
out_obdos:
RETURN (rc);
}
-static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
+static int lov_setattr(struct lustre_handle *conn, struct obdo *src_oa,
struct lov_stripe_md *lsm, struct obd_trans_info *oti)
{
- struct obdo *tmp;
+ struct obdo *tmp_oa, *ret_oa;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
if (!export || !export->exp_obd)
GOTO(out, rc = -ENODEV);
- /* size changes should go through punch and not setattr */
- LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
-
- /* for now, we only expect mtime updates here */
- LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
-
- tmp = obdo_alloc();
- if (!tmp)
+ /* for now, we only expect time updates here */
+ LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE|OBD_MD_FLMODE|
+ OBD_MD_FLATIME | OBD_MD_FLMTIME |
+ OBD_MD_FLCTIME)));
+ ret_oa = obdo_alloc();
+ if (!ret_oa)
GOTO(out, rc = -ENOMEM);
- if (oa->o_valid & OBD_MD_FLHANDLE)
- lfh = lov_handle2lfh(obdo_handle(oa));
+ tmp_oa = obdo_alloc();
+ if (!tmp_oa)
+ GOTO(out_oa, rc = -ENOMEM);
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
continue;
}
- obdo_cpy_md(tmp, oa, oa->o_valid);
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
if (lfh)
- memcpy(obdo_handle(tmp), lfh->lfh_och + i,
- FD_OSTDATA_SIZE);
+ memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
else
- tmp->o_valid &= ~OBD_MD_FLHANDLE;
+ tmp_oa->o_valid &= ~OBD_MD_FLHANDLE;
- tmp->o_id = loi->loi_id;
+ tmp_oa->o_id = loi->loi_id;
- err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
+ err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
NULL, NULL);
if (err) {
if (lov->tgts[loi->loi_ost_idx].active) {
CERROR("error: setattr objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx,
- err);
+ src_oa->o_id, loi->loi_id,
+ loi->loi_ost_idx, err);
if (!rc)
rc = err;
}
- } else
- set = 1;
+ continue;
+ }
+
+ lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
}
- obdo_free(tmp);
if (!set && !rc)
rc = -EIO;
if (lfh != NULL)
lov_lfh_put(lfh);
- GOTO(out, rc);
- out:
+
+ ret_oa->o_id = src_oa->o_id;
+ memcpy(src_oa, ret_oa, sizeof(*src_oa));
+ GOTO(out_tmp, rc);
+out_tmp:
+ obdo_free(tmp_oa);
+out_oa:
+ obdo_free(ret_oa);
+out:
class_export_put(export);
return rc;
}
-static int lov_open(struct lustre_handle *conn, struct obdo *oa,
+static int lov_open(struct lustre_handle *conn, struct obdo *src_oa,
struct lov_stripe_md *lsm, struct obd_trans_info *oti,
struct obd_client_handle *och)
{
- struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
+ struct obdo *tmp_oa, *ret_oa;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
struct lov_oinfo *loi;
if (!export || !export->exp_obd)
GOTO(out_exp, rc = -ENODEV);
- tmp = obdo_alloc();
- if (!tmp)
+ ret_oa = obdo_alloc();
+ if (!ret_oa)
GOTO(out_exp, rc = -ENOMEM);
+ tmp_oa = obdo_alloc();
+ if (!tmp_oa)
+ GOTO(out_oa, rc = -ENOMEM);
+
lfh = lov_lfh_new();
if (lfh == NULL)
GOTO(out_tmp, rc = -ENOMEM);
- OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
+ OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof(*och));
if (!lfh->lfh_och)
GOTO(out_lfh, rc = -ENOMEM);
lov = &export->exp_obd->u.lov;
- oa->o_size = 0;
- oa->o_blocks = 0;
+ src_oa->o_size = 0;
+ src_oa->o_blocks = 0;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
if (lov->tgts[loi->loi_ost_idx].active == 0) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
}
/* create data objects with "parent" OA */
- memcpy(tmp, oa, sizeof(*tmp));
- tmp->o_id = loi->loi_id;
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ tmp_oa->o_id = loi->loi_id;
- rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
- NULL, NULL, lfh->lfh_och + i);
+ rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
+ NULL, NULL, &lfh->lfh_och[i]);
if (rc) {
if (!lov->tgts[loi->loi_ost_idx].active) {
rc = 0;
}
CERROR("error: open objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n",
- oa->o_id, lsm->lsm_oinfo[i].loi_id,
+ src_oa->o_id, lsm->lsm_oinfo[i].loi_id,
loi->loi_ost_idx, rc);
goto out_handles;
}
- lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
+ lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
}
lfh->lfh_count = lsm->lsm_stripe_count;
och->och_fh.cookie = lfh->lfh_handle.h_cookie;
- obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
- oa->o_valid |= OBD_MD_FLHANDLE;
+ obdo_handle(ret_oa)->cookie = lfh->lfh_handle.h_cookie;
+ ret_oa->o_valid |= OBD_MD_FLHANDLE;
+ ret_oa->o_id = src_oa->o_id;
+ memcpy(src_oa, ret_oa, sizeof(*src_oa));
- /* llfh refcount transfers to list */
+ /* lfh refcount transfers to list */
spin_lock(&export->exp_lov_data.led_lock);
list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
spin_unlock(&export->exp_lov_data.led_lock);
GOTO(out_tmp, rc);
out_tmp:
- obdo_free(tmp);
+ obdo_free(tmp_oa);
+ out_oa:
+ obdo_free(ret_oa);
out_exp:
class_export_put(export);
return rc;
if (lov->tgts[loi->loi_ost_idx].active == 0)
continue;
- memcpy(tmp, oa, sizeof(*tmp));
- tmp->o_id = loi->loi_id;
- memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ tmp_oa->o_id = loi->loi_id;
+ memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i], FD_OSTDATA_SIZE);
- err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
+ err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
NULL, NULL);
if (err && lov->tgts[loi->loi_ost_idx].active) {
CERROR("error: closing objid "LPX64" subobj "LPX64
" on OST idx %d after open error: rc=%d\n",
- oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+ src_oa->o_id, loi->loi_id, loi->loi_ost_idx,err);
}
}
if (oa->o_valid & OBD_MD_FLHANDLE)
lfh = lov_handle2lfh(obdo_handle(oa));
+ if (!lfh)
+ LBUG();
lov = &export->exp_obd->u.lov;
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
+ memcpy(obdo_handle(&tmp), &lfh->lfh_och[i],
FD_OSTDATA_SIZE);
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
lov_lfh_destroy(lfh);
+ LASSERT(atomic_read(&lfh->lfh_refcount) == 1);
lov_lfh_put(lfh); /* balance handle2lfh above */
- }
+ } else
+ LBUG();
GOTO(out, rc);
out:
class_export_put(export);
return rc;
}
-#ifndef log2
-#define log2(n) ffz(~(n))
-#endif
-
/* we have an offset in file backed by an lov and want to find out where
* that offset lands in our given stripe of the file. for the easy
* case where the offset is within the stripe, we just have to scale the
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
- FD_OSTDATA_SIZE);
+ memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
return 0;
}
-static int lov_brw(int cmd, struct lustre_handle *conn,
+static int lov_brw(int cmd, struct lustre_handle *conn, struct obdo *src_oa,
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct obd_trans_info *oti)
{
int ost_idx;
} *stripeinfo, *si, *si_last;
struct obd_export *export = class_conn2export(conn);
+ struct obdo *ret_oa = NULL, *tmp_oa = NULL;
+ struct lov_file_handles *lfh = NULL;
struct lov_obd *lov;
struct brw_page *ioarr;
struct lov_oinfo *loi;
- int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
+ int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0;
ENTRY;
if (lsm_bad_magic(lsm))
if (!ioarr)
GOTO(out_where, rc = -ENOMEM);
+ if (src_oa) {
+ ret_oa = obdo_alloc();
+ if (!ret_oa)
+ GOTO(out_ioarr, rc = -ENOMEM);
+
+ tmp_oa = obdo_alloc();
+ if (!tmp_oa)
+ GOTO(out_oa, rc = -ENOMEM);
+
+ if (src_oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(src_oa));
+ else
+ src_oa->o_valid &= ~OBD_MD_FLHANDLE;
+ }
+
for (i = 0; i < oa_bufs; i++) {
where[i] = lov_stripe_number(lsm, pga[i].off);
stripeinfo[where[i]].bufct++;
if (lov->tgts[si->ost_idx].active == 0) {
CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
- GOTO(out_ioarr, rc = -EIO);
+ GOTO(out_oa, rc = -EIO);
}
if (si->bufct) {
LASSERT(shift < oa_bufs);
- rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+ if (src_oa) {
+ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+ if (lfh)
+ memcpy(obdo_handle(tmp_oa),
+ &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
+ }
+
+ tmp_oa->o_id = si->lsm.lsm_object_id;
+ rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn, tmp_oa,
&si->lsm, si->bufct, &ioarr[shift],
oti);
if (rc)
GOTO(out_ioarr, rc);
+
+ lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
+ i, &set);
}
}
- GOTO(out_ioarr, rc);
+
+ ret_oa->o_id = src_oa->o_id;
+ memcpy(src_oa, ret_oa, sizeof(*src_oa));
+
+ GOTO(out_oa, rc);
+ out_oa:
+ if (tmp_oa)
+ obdo_free(tmp_oa);
+ if (ret_oa)
+ obdo_free(ret_oa);
out_ioarr:
OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
out_where:
OBD_FREE(where, sizeof(*where) * oa_bufs);
+ if (lfh)
+ lov_lfh_put(lfh);
out_sinfo:
OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
out_exp:
return rc;
}
-static int lov_brw_interpret (struct ptlrpc_request_set *set,
- struct lov_brw_async_args *aa, int rc)
+static int lov_brw_interpret(struct ptlrpc_request_set *rqset,
+ struct lov_brw_async_args *aa, int rc)
{
- obd_count oa_bufs = aa->aa_oa_bufs;
- struct brw_page *ioarr = aa->aa_ioarr;
+ struct lov_stripe_md *lsm = aa->aa_lsm;
+ obd_count oa_bufs = aa->aa_oa_bufs;
+ struct obdo *oa = aa->aa_oa;
+ struct obdo *obdos = aa->aa_obdos;
+ struct brw_page *ioarr = aa->aa_ioarr;
+ struct lov_oinfo *loi;
+ int i, set = 0;
ENTRY;
- OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
- RETURN (rc);
+ if (rc == 0) {
+ /* NB all stripe requests succeeded to get here */
+
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+ i++, loi++) {
+ if (obdos[i].o_valid == 0) /* inactive stripe */
+ continue;
+
+ lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
+ i, &set);
+ }
+
+ if (!set) {
+ CERROR("No stripes had valid attrs\n");
+ rc = -EIO;
+ }
+ }
+ oa->o_id = lsm->lsm_object_id;
+
+ OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos));
+ OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
+ RETURN(rc);
}
-static int lov_brw_async(int cmd, struct lustre_handle *conn,
+static int lov_brw_async(int cmd, struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct ptlrpc_request_set *set,
struct obd_trans_info *oti)
} *stripeinfo, *si, *si_last;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
+ struct lov_file_handles *lfh = NULL;
struct brw_page *ioarr;
+ struct obdo *obdos = NULL;
struct lov_oinfo *loi;
struct lov_brw_async_args *aa;
int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
if (!where)
GOTO(out_sinfo, rc = -ENOMEM);
+ if (oa) {
+ OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count);
+ if (!obdos)
+ GOTO(out_where, rc = -ENOMEM);
+
+ if (oa->o_valid & OBD_MD_FLHANDLE)
+ lfh = lov_handle2lfh(obdo_handle(oa));
+ else
+ oa->o_valid &= ~OBD_MD_FLHANDLE;
+ }
+
OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
if (!ioarr)
- GOTO(out_where, rc = -ENOMEM);
+ GOTO(out_obdos, rc = -ENOMEM);
for (i = 0; i < oa_bufs; i++) {
where[i] = lov_stripe_number(lsm, pga[i].off);
si->index = si_last->index + si_last->bufct;
si->lsm.lsm_object_id = loi->loi_id;
si->ost_idx = loi->loi_ost_idx;
+
+ if (oa) {
+ memcpy(&obdos[i], oa, sizeof(*obdos));
+ obdos[i].o_id = si->lsm.lsm_object_id;
+ if (lfh)
+ memcpy(obdo_handle(&obdos[i]),
+ &lfh->lfh_och[i].och_fh,
+ sizeof(lfh->lfh_och[i].och_fh));
+ }
}
for (i = 0; i < oa_bufs; i++) {
}
LASSERT(shift < oa_bufs);
+
rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
- &si->lsm, si->bufct, &ioarr[shift],
- set, oti);
+ &obdos[i], &si->lsm, si->bufct,
+ &ioarr[shift], set, oti);
if (rc)
GOTO(out_ioarr, rc);
}
- LASSERT (rc == 0);
- LASSERT (set->set_interpret == NULL);
- set->set_interpret = lov_brw_interpret;
- LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
+ LASSERT(rc == 0);
+ LASSERT(set->set_interpret == NULL);
+ set->set_interpret = (set_interpreter_func)lov_brw_interpret;
+ LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args));
aa = (struct lov_brw_async_args *)&set->set_args;
- aa->aa_oa_bufs = oa_bufs;
+ aa->aa_lsm = lsm;
+ aa->aa_obdos = obdos;
+ aa->aa_oa = oa;
aa->aa_ioarr = ioarr;
+ aa->aa_oa_bufs = oa_bufs;
+
+ /* Don't free ioarr or obdos - that's done in lov_brw_interpret */
GOTO(out_where, rc);
+
out_ioarr:
OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
+ out_obdos:
+ OBD_FREE(obdos, stripe_count * sizeof(*obdos));
out_where:
OBD_FREE(where, sizeof(*where) * oa_bufs);
+ if (lfh)
+ lov_lfh_put(lfh);
out_sinfo:
OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
out_exp:
(tot) += (add); \
} while(0)
-static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
+static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
+ unsigned long max_age)
{
- struct obd_export *tgt_export;
- struct lov_obd *lov;
+ struct lov_obd *lov = &obd->u.lov;
struct obd_statfs lov_sfs;
int set = 0;
int rc = 0;
int i;
ENTRY;
- if (!export || !export->exp_obd)
- RETURN(-ENODEV);
-
- lov = &export->exp_obd->u.lov;
/* We only get block data from the OBD */
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
continue;
}
- tgt_export = class_conn2export(&lov->tgts[i].conn);
- if (!tgt_export) {
- CDEBUG(D_HA, "lov idx %d NULL export\n", i);
- continue;
- }
-
- err = obd_statfs(tgt_export, &lov_sfs);
- class_export_put(tgt_export);
+ err = obd_statfs(class_conn2obd(&lov->tgts[i].conn), &lov_sfs,
+ max_age);
if (err) {
if (lov->tgts[i].active) {
CERROR("error: statfs OSC %s on OST idx %d: "
}
continue;
}
+
if (!set) {
memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
set = 1;
LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
}
}
+
if (set) {
__u32 expected_stripes = lov->desc.ld_default_stripe_count ?
lov->desc.ld_default_stripe_count :
do_div(osfs->os_ffree, expected_stripes);
} else if (!rc)
rc = -EIO;
+
RETURN(rc);
}
RETURN(-EINVAL);
}
-static int lov_mark_page_dirty(struct lustre_handle *conn,
+static int lov_set_info(struct lustre_handle *conn, obd_count keylen,
+ void *key, obd_count vallen, void *val)
+{
+ struct obd_device *obddev = class_conn2obd(conn);
+ struct lov_obd *lov = &obddev->u.lov;
+ int i, rc = 0;
+ ENTRY;
+
+ if (keylen < strlen("mds_conn") ||
+ memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
+ RETURN(-EINVAL);
+
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ int er;
+ er = obd_set_info(&lov->tgts[i].conn, keylen, key, vallen, val);
+ if (!rc)
+ rc = er;
+ }
+ RETURN(rc);
+}
+
+static int lov_mark_page_dirty(struct lustre_handle *conn,
struct lov_stripe_md *lsm, unsigned long offset)
{
struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
RETURN(-ENOMEM);
stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
- lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe,
+ lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe,
&off);
off >>= PAGE_CACHE_SHIFT;
loi = &lsm->lsm_oinfo[stripe];
- CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset,
+ CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset,
(unsigned long)off, stripe);
submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
RETURN(rc);
}
-static int lov_clear_dirty_pages(struct lustre_handle *conn,
+static int lov_clear_dirty_pages(struct lustre_handle *conn,
struct lov_stripe_md *lsm, unsigned long start,
unsigned long end, unsigned long *cleared)
obd_start >>= PAGE_CACHE_SHIFT;
obd_end >>= PAGE_CACHE_SHIFT;
- CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n",
- start, end, (unsigned long)obd_start,
+ CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n",
+ start, end, (unsigned long)obd_start,
(unsigned long)obd_end, loi->loi_ost_idx);
submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
- rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn,
+ rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn,
submd, obd_start, obd_end,
&osc_cleared);
if (rc)
*offset = 0;
lov = &export->exp_obd->u.lov;
rc = -ENOENT;
- for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
- i++, loi++) {
+ for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){
count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
skip = (lsm->lsm_stripe_count - 1) * count;
submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
- err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn,
+ err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn,
submd, &tmp);
if (err == -ENOENT)
continue;
GOTO(out_exp, rc = err);
rc = 0;
- if (tmp != ~0)
+ if (tmp != ~0)
tmp += (tmp/count * skip) + (i * count);
if (tmp > *offset)
*offset = tmp;
RETURN(rc);
}
+/* For LOV catalogs, we "nest" catalogs from the parent catalog. What this
+ * means is that the parent catalog has a bunch of log cookies that are
+ * pointing at one catalog for each OSC. The OSC catalogs in turn hold
+ * cookies for actual log files. */
+static int lov_get_catalogs(struct lov_obd *lov, struct llog_handle *cathandle)
+{
+ int i, rc;
+
+ ENTRY;
+ for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+ lov->tgts[i].ltd_cathandle = llog_new_log(cathandle,
+ &lov->tgts[i].uuid);
+ if (IS_ERR(lov->tgts[i].ltd_cathandle))
+ continue;
+ rc = llog_init_catalog(cathandle, &lov->tgts[i].uuid);
+ if (rc)
+ GOTO(err_logs, rc);
+ }
+ lov->lo_catalog_loaded = 1;
+ RETURN(0);
+err_logs:
+ while (i-- > 0) {
+ llog_delete_log(cathandle, lov->tgts[i].ltd_cathandle);
+ llog_close_log(cathandle, lov->tgts[i].ltd_cathandle);
+ }
+ return rc;
+}
+
+/* Add log records for each OSC that this object is striped over, and return
+ * cookies for each one. We _would_ have nice abstraction here, except that
+ * we need to keep cookies in stripe order, even if some are NULL, so that
+ * the right cookies are passed back to the right OSTs at the client side.
+ * Unset cookies should be all-zero (which will never occur naturally). */
+static int lov_log_add(struct lustre_handle *conn,
+ struct llog_handle *cathandle,
+ struct llog_trans_hdr *rec, struct lov_stripe_md *lsm,
+ struct llog_cookie *logcookies, int numcookies)
+{
+ struct obd_device *obd = class_conn2obd(conn);
+ struct lov_obd *lov = &obd->u.lov;
+ struct lov_oinfo *loi;
+ int i, rc = 0;
+ ENTRY;
+
+ LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
+
+ if (unlikely(!lov->lo_catalog_loaded))
+ lov_get_catalogs(lov, cathandle);
+
+ for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+ rc += obd_log_add(&lov->tgts[loi->loi_ost_idx].conn,
+ lov->tgts[loi->loi_ost_idx].ltd_cathandle,
+ rec, NULL, logcookies + rc, numcookies - rc);
+ }
+
+ RETURN(rc);
+}
+
+static int lov_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+ int count, struct llog_cookie *cookies, int flags)
+{
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ struct lov_oinfo *loi;
+ int rc = 0, i;
+ ENTRY;
+
+ LASSERT(lsm != NULL);
+ if (export == NULL || export->exp_obd == NULL)
+ GOTO(out, rc = -ENODEV);
+
+ LASSERT(count == lsm->lsm_stripe_count);
+
+ loi = lsm->lsm_oinfo;
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < count; i++, cookies++, loi++) {
+ int err;
+
+ err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
+ NULL, 1, cookies, flags);
+ if (err && lov->tgts[loi->loi_ost_idx].active) {
+ CERROR("error: objid "LPX64" subobj "LPX64
+ " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
+ loi->loi_id, loi->loi_ost_idx, err);
+ if (!rc)
+ rc = err;
+ }
+ }
+ GOTO(out, rc);
+ out:
+ class_export_put(export);
+ return rc;
+}
+
struct obd_ops lov_obd_ops = {
o_owner: THIS_MODULE,
o_attach: lov_attach,
o_cancel_unused: lov_cancel_unused,
o_iocontrol: lov_iocontrol,
o_get_info: lov_get_info,
- .o_mark_page_dirty = lov_mark_page_dirty,
- .o_clear_dirty_pages = lov_clear_dirty_pages,
- .o_last_dirty_offset = lov_last_dirty_offset,
+ o_set_info: lov_set_info,
+ o_log_add: lov_log_add,
+ o_log_cancel: lov_log_cancel,
+ o_mark_page_dirty: lov_mark_page_dirty,
+ o_clear_dirty_pages: lov_clear_dirty_pages,
+ o_last_dirty_offset: lov_last_dirty_offset,
};
int __init lov_init(void)
struct lprocfs_static_vars lvars;
int rc;
- printk(KERN_INFO "Lustre Logical Object Volume driver; "
- "info@clusterfs.com\n");
- lprocfs_init_vars(&lvars);
+ lprocfs_init_vars(lov, &lvars);
rc = class_register_type(&lov_obd_ops, lvars.module_vars,
OBD_LOV_DEVICENAME);
RETURN(rc);
}
-static void __exit lov_exit(void)
+static void /*__exit*/ lov_exit(void)
{
class_unregister_type(OBD_LOV_DEVICENAME);
}