X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_device.c;h=9de0d803ac622a3296b86e49272fd812c2ebcaf9;hb=a775bb1e09ebc0d5058a765cca978b61b7653370;hp=e9bfcfcc1de676aad6ee620bd20363b54a18fc26;hpb=0e660eab787c3b2857e4295f1ec554e016393885;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index e9bfcfc..9de0d80 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -60,12 +60,15 @@ #include #include /* for changelogs */ #include +#include #include "mdd_internal.h" const struct md_device_operations mdd_ops; +static struct lu_device_type mdd_device_type; static const char mdd_root_dir_name[] = "ROOT"; +static const char mdd_obf_dir_name[] = "fid"; static int mdd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) @@ -112,6 +115,8 @@ static void mdd_device_shutdown(const struct lu_env *env, ENTRY; mdd_changelog_fini(env, m); dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb); + mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf); + mdd_object_put(env, m->mdd_dot_lustre); if (m->mdd_obd_dev) mdd_fini_obd(env, m, cfg); orph_index_fini(env, m); @@ -127,16 +132,10 @@ static int changelog_init_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr, struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; ENTRY; - if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) { - CERROR("log is not plain\n"); - RETURN(-EINVAL); - } - if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { - CERROR("Not a changelog rec? %d\n", rec->cr_hdr.lrh_type); - RETURN(-EINVAL); - } + LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN); + LASSERT(rec->cr_hdr.lrh_type == CHANGELOG_REC); - CDEBUG(D_INODE, + CDEBUG(D_INFO, "seeing record at index %d/%d/"LPU64" t=%x %.*s in log "LPX64"\n", hdr->lrh_index, rec->cr_hdr.lrh_index, rec->cr_index, rec->cr_type, rec->cr_namelen, rec->cr_name, @@ -146,32 +145,76 @@ static int changelog_init_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr, RETURN(LLOG_PROC_BREAK); } +static int changelog_user_init_cb(struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct mdd_device *mdd = (struct mdd_device *)data; + struct llog_changelog_user_rec *rec = + (struct llog_changelog_user_rec *)hdr; + ENTRY; + + LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN); + LASSERT(rec->cur_hdr.lrh_type == CHANGELOG_USER_REC); + + CDEBUG(D_INFO, "seeing user at index %d/%d id=%d endrec="LPU64 + " in log "LPX64"\n", hdr->lrh_index, rec->cur_hdr.lrh_index, + rec->cur_id, rec->cur_endrec, llh->lgh_id.lgl_oid); + + spin_lock(&mdd->mdd_cl.mc_user_lock); + mdd->mdd_cl.mc_lastuser = rec->cur_id; + spin_unlock(&mdd->mdd_cl.mc_user_lock); + + RETURN(LLOG_PROC_BREAK); +} + + static int mdd_changelog_llog_init(struct mdd_device *mdd) { struct obd_device *obd = mdd2obd_dev(mdd); struct llog_ctxt *ctxt; int rc; + /* Find last changelog entry number */ ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); if (ctxt == NULL) { - CERROR("no context\n"); + CERROR("no changelog context\n"); return -EINVAL; } if (!ctxt->loc_handle) { - CERROR("no handle\n"); + llog_ctxt_put(ctxt); return -EINVAL; } + rc = llog_cat_reverse_process(ctxt->loc_handle, changelog_init_cb, mdd); llog_ctxt_put(ctxt); - if (rc < 0) + if (rc < 0) { CERROR("changelog init failed: %d\n", rc); - else - rc = 0; /* llog_proc_break is ok */ + return rc; + } + CDEBUG(D_INODE, "changelog starting index="LPU64"\n", + mdd->mdd_cl.mc_index); - CDEBUG(D_INODE, "changelog_init index="LPU64"\n", mdd->mdd_cl.mc_index); + /* Find last changelog user id */ + ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT); + if (ctxt == NULL) { + CERROR("no changelog user context\n"); + return -EINVAL; + } + if (!ctxt->loc_handle) { + llog_ctxt_put(ctxt); + return -EINVAL; + } - return rc; + rc = llog_cat_reverse_process(ctxt->loc_handle, changelog_user_init_cb, + mdd); + llog_ctxt_put(ctxt); + + if (rc < 0) { + CERROR("changelog user init failed: %d\n", rc); + return rc; + } + return 0; } static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd) @@ -181,15 +224,18 @@ static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd) mdd->mdd_cl.mc_index = 0; spin_lock_init(&mdd->mdd_cl.mc_lock); cfs_waitq_init(&mdd->mdd_cl.mc_waitq); - mdd->mdd_cl.mc_starttime = cfs_time_current_64(); mdd->mdd_cl.mc_flags = 0; /* off by default */ - mdd->mdd_cl.mc_mask = CL_DEFMASK; + mdd->mdd_cl.mc_mask = CHANGELOG_DEFMASK; + spin_lock_init(&mdd->mdd_cl.mc_user_lock); + mdd->mdd_cl.mc_lastuser = 0; + rc = mdd_changelog_llog_init(mdd); if (rc) { CERROR("Changelog setup during init failed %d\n", rc); mdd->mdd_cl.mc_flags |= CLM_ERR; } + return rc; } @@ -249,19 +295,41 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec) { struct obd_device *obd = mdd2obd_dev(mdd); struct llog_ctxt *ctxt; + long long unsigned cur; int rc; ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); if (ctxt == NULL) return -ENXIO; - /* Some records purged; reset repeat-access time */ + spin_lock(&mdd->mdd_cl.mc_lock); + cur = (long long)mdd->mdd_cl.mc_index; + spin_unlock(&mdd->mdd_cl.mc_lock); + if (endrec > cur) + endrec = cur; + + /* purge to "0" is shorthand for everything */ + if (endrec == 0) + endrec = cur; + + /* If purging all records, write a header entry so we don't have an + empty catalog and we're sure to have a valid starting index next + time. In case of crash, we just restart with old log so we're + allright. */ + if (endrec == cur) { + rc = mdd_changelog_write_header(mdd, CLM_PURGE); + if (rc) + goto out; + } + + /* Some records were purged, so reset repeat-access time (so we + record new mtime update records, so users can see a file has been + changed since the last purge) */ mdd->mdd_cl.mc_starttime = cfs_time_current_64(); rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0); - +out: llog_ctxt_put(ctxt); - return rc; } @@ -300,6 +368,387 @@ int mdd_changelog_write_header(struct mdd_device *mdd, int markerflags) RETURN(rc); } +/** + * Create ".lustre" directory. + */ +static int create_dot_lustre_dir(const struct lu_env *env, struct mdd_device *m) +{ + struct lu_fid *fid = &mdd_env_info(env)->mti_fid; + struct md_object *mdo; + int rc; + + memcpy(fid, &LU_DOT_LUSTRE_FID, sizeof(struct lu_fid)); + mdo = llo_store_create_index(env, &m->mdd_md_dev, m->mdd_child, + mdd_root_dir_name, mdd_dot_lustre_name, + fid, &dt_directory_features); + /* .lustre dir may be already present */ + if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) { + rc = PTR_ERR(mdo); + CERROR("creating obj [%s] fid = "DFID" rc = %d\n", + mdd_dot_lustre_name, PFID(fid), rc); + RETURN(rc); + } + + if (!IS_ERR(mdo)) + lu_object_put(env, &mdo->mo_lu); + + return 0; +} + +static int dot_lustre_attr_get(const struct lu_env *env, struct md_object *obj, + struct md_attr *ma) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + return mdd_attr_get_internal_locked(env, mdd_obj, ma); +} + +static int dot_lustre_attr_set(const struct lu_env *env, struct md_object *obj, + const struct md_attr *ma) +{ + return -EPERM; +} + +static int dot_lustre_xattr_get(const struct lu_env *env, + struct md_object *obj, struct lu_buf *buf, + const char *name) +{ + return 0; +} + +static int dot_lustre_mdd_open(const struct lu_env *env, struct md_object *obj, + int flags) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + mdd_obj->mod_count++; + mdd_write_unlock(env, mdd_obj); + + return 0; +} + +static int dot_lustre_path(const struct lu_env *env, struct md_object *obj, + char *path, int pathlen, __u64 *recno, int *linkno) +{ + return -ENOSYS; +} + +static int dot_lustre_close(const struct lu_env *env, struct md_object *obj, + struct md_attr *ma) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + mdd_obj->mod_count--; + mdd_write_unlock(env, mdd_obj); + + return 0; +} + +static struct md_object_operations mdd_dot_lustre_obj_ops = { + .moo_attr_get = dot_lustre_attr_get, + .moo_attr_set = dot_lustre_attr_set, + .moo_xattr_get = dot_lustre_xattr_get, + .moo_open = dot_lustre_mdd_open, + .moo_close = dot_lustre_close, + .moo_readpage = mdd_readpage, + .moo_path = dot_lustre_path +}; + +static int dot_lustre_lookup(const struct lu_env *env, struct md_object *p, + const struct lu_name *lname, struct lu_fid *f, + struct md_op_spec *spec) +{ + if (strcmp(lname->ln_name, mdd_obf_dir_name) == 0) + *f = LU_OBF_FID; + else + return -ENOENT; + + return 0; +} + +static int dot_lustre_create(const struct lu_env *env, struct md_object *pobj, + const struct lu_name *lname, + struct md_object *child, struct md_op_spec *spec, + struct md_attr* ma) +{ + return -EPERM; +} + +static int dot_lustre_rename(const struct lu_env *env, + struct md_object *src_pobj, + struct md_object *tgt_pobj, + const struct lu_fid *lf, + const struct lu_name *lsname, + struct md_object *tobj, + const struct lu_name *ltname, struct md_attr *ma) +{ + return -EPERM; +} + +static int dot_lustre_link(const struct lu_env *env, struct md_object *tgt_obj, + struct md_object *src_obj, + const struct lu_name *lname, struct md_attr *ma) +{ + return -EPERM; +} + +static int dot_lustre_unlink(const struct lu_env *env, struct md_object *pobj, + struct md_object *cobj, const struct lu_name *lname, + struct md_attr *ma) +{ + return -EPERM; +} + +static struct md_dir_operations mdd_dot_lustre_dir_ops = { + .mdo_lookup = dot_lustre_lookup, + .mdo_create = dot_lustre_create, + .mdo_rename = dot_lustre_rename, + .mdo_link = dot_lustre_link, + .mdo_unlink = dot_lustre_unlink, +}; + +static int obf_attr_get(const struct lu_env *env, struct md_object *obj, + struct md_attr *ma) +{ + int rc = 0; + + if (ma->ma_need & MA_INODE) { + struct mdd_device *mdd = mdo2mdd(obj); + + /* "fid" is a virtual object and hence does not have any "real" + * attributes. So we reuse attributes of .lustre for "fid" dir */ + ma->ma_need |= MA_INODE; + rc = dot_lustre_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma); + if (rc) + return rc; + ma->ma_valid |= MA_INODE; + } + + /* "fid" directory does not have any striping information. */ + if (ma->ma_need & MA_LOV) { + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + if (ma->ma_valid & MA_LOV) + return 0; + + if (!(S_ISREG(mdd_object_type(mdd_obj)) || + S_ISDIR(mdd_object_type(mdd_obj)))) + return 0; + + if (ma->ma_need & MA_LOV_DEF) { + rc = mdd_get_default_md(mdd_obj, ma->ma_lmm, + &ma->ma_lmm_size); + if (rc > 0) { + ma->ma_valid |= MA_LOV; + rc = 0; + } + } + } + + return rc; +} + +static int obf_attr_set(const struct lu_env *env, struct md_object *obj, + const struct md_attr *ma) +{ + return -EPERM; +} + +static int obf_xattr_get(const struct lu_env *env, + struct md_object *obj, struct lu_buf *buf, + const char *name) +{ + return 0; +} + +static int obf_mdd_open(const struct lu_env *env, struct md_object *obj, + int flags) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + mdd_obj->mod_count++; + mdd_write_unlock(env, mdd_obj); + + return 0; +} + +static int obf_mdd_close(const struct lu_env *env, struct md_object *obj, + struct md_attr *ma) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); + mdd_obj->mod_count--; + mdd_write_unlock(env, mdd_obj); + + return 0; +} + +/** Nothing to list in "fid" directory */ +static int obf_mdd_readpage(const struct lu_env *env, struct md_object *obj, + const struct lu_rdpg *rdpg) +{ + return -EPERM; +} + +static int obf_path(const struct lu_env *env, struct md_object *obj, + char *path, int pathlen, __u64 *recno, int *linkno) +{ + return -ENOSYS; +} + +static struct md_object_operations mdd_obf_obj_ops = { + .moo_attr_get = obf_attr_get, + .moo_attr_set = obf_attr_set, + .moo_xattr_get = obf_xattr_get, + .moo_open = obf_mdd_open, + .moo_close = obf_mdd_close, + .moo_readpage = obf_mdd_readpage, + .moo_path = obf_path +}; + +/** + * Lookup method for "fid" object. Only filenames with correct SEQ:OID format + * are valid. We also check if object with passed fid exists or not. + */ +static int obf_lookup(const struct lu_env *env, struct md_object *p, + const struct lu_name *lname, struct lu_fid *f, + struct md_op_spec *spec) +{ + char *name = (char *)lname->ln_name; + struct mdd_device *mdd = mdo2mdd(p); + struct mdd_object *child; + int rc = 0; + + while (*name == '[') + name++; + + sscanf(name, SFID, &(f->f_seq), &(f->f_oid), + &(f->f_ver)); + if (!fid_is_sane(f)) { + CWARN("bad FID format [%s], should be "DFID"\n", lname->ln_name, + (__u64)1, 2, 0); + GOTO(out, rc = -EINVAL); + } + + /* Check if object with this fid exists */ + child = mdd_object_find(env, mdd, f); + if (child == NULL) + GOTO(out, rc = 0); + if (IS_ERR(child)) + GOTO(out, rc = PTR_ERR(child)); + + if (mdd_object_exists(child) == 0) + rc = -ENOENT; + + mdd_object_put(env, child); + +out: + return rc; +} + +static int obf_create(const struct lu_env *env, struct md_object *pobj, + const struct lu_name *lname, struct md_object *child, + struct md_op_spec *spec, struct md_attr* ma) +{ + return -EPERM; +} + +static int obf_rename(const struct lu_env *env, + struct md_object *src_pobj, struct md_object *tgt_pobj, + const struct lu_fid *lf, const struct lu_name *lsname, + struct md_object *tobj, const struct lu_name *ltname, + struct md_attr *ma) +{ + return -EPERM; +} + +static int obf_link(const struct lu_env *env, struct md_object *tgt_obj, + struct md_object *src_obj, const struct lu_name *lname, + struct md_attr *ma) +{ + return -EPERM; +} + +static int obf_unlink(const struct lu_env *env, struct md_object *pobj, + struct md_object *cobj, const struct lu_name *lname, + struct md_attr *ma) +{ + return -EPERM; +} + +static struct md_dir_operations mdd_obf_dir_ops = { + .mdo_lookup = obf_lookup, + .mdo_create = obf_create, + .mdo_rename = obf_rename, + .mdo_link = obf_link, + .mdo_unlink = obf_unlink +}; + +/** + * Create special in-memory "fid" object for open-by-fid. + */ +static int mdd_obf_setup(const struct lu_env *env, struct mdd_device *m) +{ + struct mdd_object *mdd_obf; + struct lu_object *obf_lu_obj; + int rc = 0; + + m->mdd_dot_lustre_objs.mdd_obf = mdd_object_find(env, m, + &LU_OBF_FID); + if (m->mdd_dot_lustre_objs.mdd_obf == NULL || + IS_ERR(m->mdd_dot_lustre_objs.mdd_obf)) + GOTO(out, rc = -ENOENT); + + mdd_obf = m->mdd_dot_lustre_objs.mdd_obf; + mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops; + mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops; + /* Don't allow objects to be created in "fid" dir */ + mdd_obf->mod_flags |= IMMUTE_OBJ; + + obf_lu_obj = mdd2lu_obj(mdd_obf); + obf_lu_obj->lo_header->loh_attr |= (LOHA_EXISTS | S_IFDIR); + +out: + return rc; +} + +/** Setup ".lustre" directory object */ +static int mdd_dot_lustre_setup(const struct lu_env *env, struct mdd_device *m) +{ + struct dt_object *dt_dot_lustre; + struct lu_fid *fid = &mdd_env_info(env)->mti_fid; + int rc; + + rc = create_dot_lustre_dir(env, m); + if (rc) + return rc; + + dt_dot_lustre = dt_store_open(env, m->mdd_child, mdd_root_dir_name, + mdd_dot_lustre_name, fid); + if (IS_ERR(dt_dot_lustre)) { + rc = PTR_ERR(dt_dot_lustre); + GOTO(out, rc); + } + + /* references are released in mdd_device_shutdown() */ + m->mdd_dot_lustre = lu2mdd_obj(lu_object_locate(dt_dot_lustre->do_lu.lo_header, + &mdd_device_type)); + + m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops; + m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops; + + rc = mdd_obf_setup(env, m); + if (rc) + CERROR("Error initializing \"fid\" object - %d.\n", rc); + +out: + RETURN(rc); +} + static int mdd_process_config(const struct lu_env *env, struct lu_device *d, struct lustre_cfg *cfg) { @@ -435,8 +884,17 @@ static int mdd_prepare(const struct lu_env *env, LASSERT(root != NULL); lu_object_put(env, &root->do_lu); rc = orph_index_init(env, mdd); - } else + } else { rc = PTR_ERR(root); + } + if (rc) + GOTO(out, rc); + + rc = mdd_dot_lustre_setup(env, mdd); + if (rc) { + CERROR("Error(%d) initializing .lustre objects\n", rc); + GOTO(out, rc); + } out: RETURN(rc); @@ -522,6 +980,15 @@ static int mdd_update_capa_key(const struct lu_env *env, RETURN(rc); } +static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m, + int idx, void **h) +{ + struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev); + + *h = llog_group_get_ctxt(&mdd2obd_dev(mdd)->obd_olg, idx); + return (*h == NULL ? -ENOENT : 0); +} + static struct lu_device *mdd_device_alloc(const struct lu_env *env, struct lu_device_type *t, struct lustre_cfg *lcfg) @@ -597,6 +1064,202 @@ struct md_capainfo *md_capainfo(const struct lu_env *env) } EXPORT_SYMBOL(md_capainfo); +static int mdd_changelog_user_register(struct mdd_device *mdd, int *id) +{ + struct llog_ctxt *ctxt; + struct llog_changelog_user_rec *rec; + int rc; + ENTRY; + + ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT); + if (ctxt == NULL) + RETURN(-ENXIO); + + OBD_ALLOC_PTR(rec); + if (rec == NULL) { + llog_ctxt_put(ctxt); + RETURN(-ENOMEM); + } + + rec->cur_hdr.lrh_len = sizeof(*rec); + rec->cur_hdr.lrh_type = CHANGELOG_USER_REC; + rec->cur_endrec = 0ULL; + spin_lock(&mdd->mdd_cl.mc_user_lock); + if (mdd->mdd_cl.mc_lastuser == (unsigned int)(-1)) { + spin_unlock(&mdd->mdd_cl.mc_user_lock); + CERROR("Maximum number of changelog users exceeded!\n"); + GOTO(out, rc = -EOVERFLOW); + } + *id = rec->cur_id = ++mdd->mdd_cl.mc_lastuser; + spin_unlock(&mdd->mdd_cl.mc_user_lock); + rc = llog_add(ctxt, &rec->cur_hdr, NULL, NULL, 0); + + CDEBUG(D_INODE, "Registered changelog user %d\n", *id); +out: + OBD_FREE_PTR(rec); + llog_ctxt_put(ctxt); + RETURN(rc); +} + +struct mdd_changelog_user_data { + __u64 mcud_endrec; /**< purge record for this user */ + __u64 mcud_minrec; /**< lowest changelog recno still referenced */ + __u32 mcud_id; + __u32 mcud_minid; /**< user id with lowest rec reference */ + int mcud_found:1; +}; + +/** Two things: + * 1. Find the smallest record everyone is willing to purge + * 2. Update the last purgeable record for this user + */ +static int mdd_changelog_user_purge_cb(struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct llog_changelog_user_rec *rec; + struct mdd_changelog_user_data *mcud = + (struct mdd_changelog_user_data *)data; + int rc; + ENTRY; + + LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN); + + rec = (struct llog_changelog_user_rec *)hdr; + + /* If we have a new endrec for this id, use it for the min check */ + if (rec->cur_id == mcud->mcud_id) + rec->cur_endrec = max(rec->cur_endrec, mcud->mcud_endrec); + + /* Track the minimum referenced record */ + if (mcud->mcud_minid == 0 || mcud->mcud_minrec > rec->cur_endrec) { + mcud->mcud_minid = rec->cur_id; + mcud->mcud_minrec = rec->cur_endrec; + } + + if (rec->cur_id != mcud->mcud_id) + RETURN(0); + + /* Update this user's record */ + mcud->mcud_found = 1; + + /* Special case: unregister this user if endrec == -1 */ + if (mcud->mcud_endrec == -1) { + struct llog_cookie cookie; + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_index = hdr->lrh_index; + rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, + 1, &cookie); + RETURN(rc); + } + + /* Update the endrec */ + CDEBUG(D_IOCTL, "Rewriting changelog user %d endrec to "LPU64"\n", + mcud->mcud_id, rec->cur_endrec); + + /* hdr+1 is loc of data */ + hdr->lrh_len -= sizeof(*hdr) + sizeof(struct llog_rec_tail); + rc = llog_write_rec(llh, hdr, NULL, 0, (void *)(hdr + 1), + hdr->lrh_index); + + RETURN(rc); +} + +static int mdd_changelog_user_purge(struct mdd_device *mdd, int id, + long long endrec) +{ + struct mdd_changelog_user_data data; + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + CDEBUG(D_IOCTL, "Purge request: id=%d, endrec="LPD64"\n", id, endrec); + + ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT); + if (ctxt == NULL) + return -ENXIO; + LASSERT(ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT); + + data.mcud_id = id; + data.mcud_endrec = endrec; + data.mcud_minid = 0; + data.mcud_minrec = 0; + rc = llog_cat_process(ctxt->loc_handle, mdd_changelog_user_purge_cb, + (void *)&data, 0, 0); + if ((rc >= 0) && (data.mcud_minrec > 0)) { + CDEBUG(D_INODE, "Purging CL entries up to "LPD64 + ", referenced by "CHANGELOG_USER_PREFIX"%d\n", + data.mcud_minrec, data.mcud_minid); + rc = mdd_changelog_llog_cancel(mdd, data.mcud_minrec); + } else { + CWARN("Could not determine changelog records to purge; rc=%d\n", + rc); + } + + if (!data.mcud_found) { + CWARN("No entry for user %d. Last changelog reference is " + LPD64" by changelog user %d\n", data.mcud_id, + data.mcud_minrec, data.mcud_minid); + rc = -ENOENT; + } + + llog_ctxt_put(ctxt); + RETURN (rc); +} + +/** mdd_iocontrol + * May be called remotely from mdt_iocontrol_handle or locally from + * mdt_iocontrol. Data may be freeform - remote handling doesn't enforce or + * swab an obd_ioctl_data format (but local ioctl handler does). + * \param cmd - ioc + * \param len - data len + * \param karg - ioctl data, in kernel space + */ +static int mdd_iocontrol(const struct lu_env *env, struct md_device *m, + unsigned int cmd, int len, void *karg) +{ + struct mdd_device *mdd; + struct obd_ioctl_data *data = karg; + int rc; + ENTRY; + + mdd = lu2mdd_dev(&m->md_lu_dev); + + /* Doesn't use obd_ioctl_data */ + if (cmd == OBD_IOC_CHANGELOG_CLEAR) { + struct changelog_setinfo *cs = karg; + if (len != sizeof(*cs)) { + CERROR("Bad changelog_clear ioctl size %d\n", len); + RETURN(-EINVAL); + } + rc = mdd_changelog_user_purge(mdd, cs->cs_id, cs->cs_recno); + RETURN(rc); + } + + /* Below ioctls use obd_ioctl_data */ + if (len != sizeof(*data)) { + CERROR("Bad ioctl size %d\n", len); + RETURN(-EINVAL); + } + if (data->ioc_version != OBD_IOCTL_VERSION) { + CERROR("Bad magic %x != %x\n", data->ioc_version, + OBD_IOCTL_VERSION); + RETURN(-EINVAL); + } + + switch (cmd) { + case OBD_IOC_CHANGELOG_REG: + rc = mdd_changelog_user_register(mdd, &data->ioc_u32_1); + break; + case OBD_IOC_CHANGELOG_DEREG: + rc = mdd_changelog_user_purge(mdd, data->ioc_u32_1, -1); + break; + default: + rc = -EOPNOTSUPP; + } + + RETURN (rc); +} + /* type constructor/destructor: mdd_type_init, mdd_type_fini */ LU_TYPE_INIT_FINI(mdd, &mdd_thread_key, &mdd_ucred_key, &mdd_capainfo_key); @@ -606,6 +1269,8 @@ const struct md_device_operations mdd_ops = { .mdo_maxsize_get = mdd_maxsize_get, .mdo_init_capa_ctxt = mdd_init_capa_ctxt, .mdo_update_capa_key= mdd_update_capa_key, + .mdo_llog_ctxt_get = mdd_llog_ctxt_get, + .mdo_iocontrol = mdd_iocontrol, #ifdef HAVE_QUOTA_SUPPORT .mdo_quota = { .mqo_notify = mdd_quota_notify,