From a84b5d47627c6727ae7ec7c8895677e6f08954cb Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 9 Jun 2009 08:57:15 +0000 Subject: [PATCH] - iterators are enabled in dmu osd - osd_shutdown() purges lu object cache - few empty methods to work with VBR and COS enabled lustre - osd_prepare() to call MDD to initialize MDS on-disk layout --- lustre/dmu-osd/osd_handler.c | 239 +++++++++++++++++++++++++++---------------- 1 file changed, 149 insertions(+), 90 deletions(-) diff --git a/lustre/dmu-osd/osd_handler.c b/lustre/dmu-osd/osd_handler.c index 8c43c6b..a160f5a 100644 --- a/lustre/dmu-osd/osd_handler.c +++ b/lustre/dmu-osd/osd_handler.c @@ -1014,8 +1014,12 @@ static int osd_declare_attr_set(const struct lu_env *env, struct osd_thandle *oh; ENTRY; + if (!dt_object_exists(dt)) { + /* XXX: sanity check that object creation is declared */ + RETURN(0); + } + LASSERT(handle != NULL); - LASSERT(dt_object_exists(dt)); LASSERT(osd_invariant(obj)); oh = container_of0(handle, struct osd_thandle, ot_super); @@ -1157,16 +1161,17 @@ static int osd_declare_object_create(const struct lu_env *env, oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh != NULL); LASSERT(oh->ot_tx != NULL); + LASSERT(dof); switch (dof->dof_type) { case DFT_DIR: + case DFT_INDEX: /* for zap create */ udmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL); break; case DFT_REGULAR: case DFT_SYM: case DFT_NODE: - case DFT_INDEX: /* first, we'll create new object */ udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT); break; @@ -1190,9 +1195,9 @@ static dmu_buf_t * osd_mkdir(struct osd_thread_info *info, struct osd_device *o struct lu_attr *attr, struct osd_thandle *oh) { - dmu_buf_t * db; + dmu_buf_t *db; - LASSERT(S_ISDIR(attr->la_mode)); + /* XXX: LASSERT(S_ISDIR(attr->la_mode)); */ udmu_zap_create(&osd->od_objset, &db, oh->ot_tx, osd_object_tag); return db; @@ -1313,15 +1318,15 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, db = osd_create_type_f(dof->dof_type)(info, osd, attr, oh); if(IS_ERR(db)) - RETURN (PTR_ERR(th)); + RETURN(PTR_ERR(th)); oid = udmu_object_get_id(db); /* XXX: zapdb should be replaced with zap-mapping-fids-to-dnode */ rc = udmu_zap_insert(&osd->od_objset, zapdb, oh->ot_tx, buf, &oid, sizeof (oid)); - if(rc) - goto out; + if (rc) + GOTO(out, rc); obj->oo_db = db; @@ -1330,8 +1335,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, udmu_object_getattr(db, &vap); vnattr2lu_attr(&vap, attr); - CDEBUG(D_OTHER, "create object %s oid["LPD64"] (objid %llu)\n", - buf, oid, vap.va_nodeid); + CDEBUG(D_OTHER, "create object "DFID" (dnode %llu)\n", + PFID(fid), oid); rc = osd_create_post(info, obj, attr, th); @@ -1348,20 +1353,22 @@ struct osd_zap_it { zap_cursor_t *ozi_zc; struct osd_object *ozi_obj; struct lustre_capa *ozi_capa; - char ozi_name[NAME_MAX+1]; + char ozi_name[NAME_MAX + 1]; char ozi_rec[IT_REC_SIZE]; }; static struct dt_it *osd_zap_it_init(const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *capa) + struct dt_object *dt, + struct lustre_capa *capa) { struct osd_zap_it *it; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct lu_object *lo = &dt->do_lu; - ENTRY; + + /* XXX: check capa ? */ + LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_db); LASSERT(udmu_object_is_zap(obj->oo_db)); @@ -1382,22 +1389,43 @@ static struct dt_it *osd_zap_it_init(const struct lu_env *env, static void osd_zap_it_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_zap_it *it = (struct osd_zap_it *)di; - struct osd_object *obj = it->ozi_obj; + struct osd_zap_it *it = (struct osd_zap_it *)di; + struct osd_object *obj; + ENTRY; + + LASSERT(it); + LASSERT(it->ozi_obj); + + obj = it->ozi_obj; udmu_zap_cursor_fini(it->ozi_zc); lu_object_put(env, &obj->oo_dt.do_lu); OBD_FREE_PTR(it); + EXIT; } +/** + * Move Iterator to record specified by \a key + * + * \param di osd iterator + * \param key key for index + * + * \retval +ve di points to record with least key not larger than key + * \retval 0 di points to exact matched key + * \retval -ve failure + */ + static int osd_zap_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) + struct dt_it *di, const struct dt_key *key) { - int rc; - struct osd_zap_it *it = (struct osd_zap_it *)di; - + struct osd_zap_it *it = (struct osd_zap_it *)di; + int rc; ENTRY; + + LASSERT(it); + LASSERT(it->ozi_zc); + rc = udmu_zap_cursor_move_to_key(it->ozi_zc, (const char *) key); if (rc == 0) /* if record exist return +1 */ RETURN(1); @@ -1416,9 +1444,9 @@ static void osd_zap_it_put(const struct lu_env *env, struct dt_it *di) static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di) { struct osd_zap_it *it = (struct osd_zap_it *)di; - //int rc; - + int rc; ENTRY; + udmu_zap_cursor_advance(it->ozi_zc); /* According to current API we need to return error if its last entry. @@ -1427,65 +1455,38 @@ static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di) * We shld make changes to Iterator API to not return status for this API * */ - /* XXX: not implemented yet */ - RETURN(0); - LBUG(); -#if 0 - rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NAME_MAX); + rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX); if (rc == ENOENT) /* end of dir*/ RETURN(+1); RETURN((-rc)); -#endif } -#if 0 -static int osd_zap_it_del(const struct lu_env *env, struct dt_it *di, - struct thandle *th) -{ - /* PBS: not called from anywhere , shld be removed from Iterator APIs - * */ - LBUG(); - - RETURN(0); -} -#endif - static struct dt_key *osd_zap_it_key(const struct lu_env *env, const struct dt_it *di) { - //struct osd_zap_it *it = (struct osd_zap_it *)di; - //int rc = 0; - + struct osd_zap_it *it = (struct osd_zap_it *)di; + int rc = 0; ENTRY; - /* XXX: not impelemented yet */ - LBUG(); - RETURN(NULL); -#if 0 - rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1); + + rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX + 1); if (!rc) RETURN((struct dt_key *)it->ozi_name); else RETURN(ERR_PTR(-rc)); -#endif } static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di) { - //struct osd_zap_it *it = (struct osd_zap_it *)di; - //int rc = 0; - + struct osd_zap_it *it = (struct osd_zap_it *)di; + int rc; ENTRY; - /* XXX: not implemented yet */ - LBUG(); - RETURN(0); -#if 0 + rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1); if (!rc) RETURN(strlen(it->ozi_name)); else RETURN(-rc); -#endif } @@ -1521,27 +1522,23 @@ static __u64 osd_zap_it_store(const struct lu_env *env, const struct dt_it *di) static int osd_zap_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) { - //struct osd_zap_it *it = (struct osd_zap_it *)di; - //struct osd_object *obj = it->ozi_obj; - //int rc; - + struct osd_zap_it *it = (struct osd_zap_it *)di; + struct osd_object *obj = it->ozi_obj; + int rc; ENTRY; - /* XXX: not implemented yet */ - LBUG(); - RETURN(0); -#if 0 + udmu_zap_cursor_init_serialized(it->ozi_zc, &osd_obj2dev(obj)->od_objset, udmu_object_get_id(obj->oo_db), hash); /* same as osd_zap_it_next()*/ - rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX); + rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX + 1); if (rc == 0) RETURN(+1); + if (rc == ENOENT) /* end of dir*/ RETURN(0); RETURN(-rc); -#endif } static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, @@ -1553,14 +1550,14 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, struct lu_fid_pack *pack; struct lu_fid *fid; dmu_buf_t *zapdb = obj->oo_db; - //dmu_buf_t *db; uint64_t oid; int rc; ENTRY; LASSERT(udmu_object_is_zap(obj->oo_db)); - if (osd_object_is_root(obj)) { + /* XXX: to decide on format of / yet */ + if (0 && osd_object_is_root(obj)) { rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key, &oid, sizeof(uint64_t), sizeof(uint64_t)); if (rc) { @@ -1577,7 +1574,8 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key, rec, 17, 1); } - RETURN(-rc); + + RETURN(rc == 0 ? 1 : -rc); } static int osd_declare_index_insert(const struct lu_env *env, @@ -1728,6 +1726,10 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, { struct osd_object *obj = osd_dt_obj(dt); LASSERT(obj->oo_db != NULL); + /* + * XXX: implement support for fixed-size keys sorted with natural + * numerical way (not using internal hash value) + */ if (udmu_object_is_zap(obj->oo_db)) dt->do_index_ops = &osd_index_ops; return 0; @@ -1803,11 +1805,11 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name, struct lustre_capa *capa) { - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - int rc, size; - + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); + int rc, size; ENTRY; + LASSERT(obj->oo_db != NULL); LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); @@ -1817,9 +1819,9 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, buf->lb_len, name, &size); up(&obj->oo_guard); - if(rc == -ENOENT) + if (rc == -ENOENT) rc = -ENODATA; - if(rc == 0) + if (rc == 0) rc = size; RETURN(rc); } @@ -1834,16 +1836,23 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(handle != NULL); + oh = container_of0(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_tx != NULL); + + if (!dt_object_exists(dt)) { + udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT); + udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT); + RETURN(0); + } + LASSERT(obj->oo_db != NULL); LASSERT(dt_object_exists(dt)); LASSERT(osd_invariant(obj)); - oh = container_of0(handle, struct osd_thandle, ot_super); - LASSERT(oh->ot_tx != NULL); down(&obj->oo_guard); - udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen, name, - oh->ot_tx); + udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen, + name, oh->ot_tx); up(&obj->oo_guard); RETURN(0); @@ -1857,9 +1866,9 @@ int osd_xattr_set(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; - int rc; - + int rc = 0; ENTRY; + LASSERT(handle != NULL); LASSERT(obj->oo_db != NULL); LASSERT(osd_invariant(obj)); @@ -1906,9 +1915,9 @@ int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; - int rc; - + int rc; ENTRY; + LASSERT(handle != NULL); LASSERT(obj->oo_db != NULL); LASSERT(osd_invariant(obj)); @@ -1927,18 +1936,18 @@ int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, struct lustre_capa *capa) { - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - int rc; - + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); + int rc; ENTRY; + LASSERT(obj->oo_db != NULL); LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); down(&obj->oo_guard); - rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db, buf->lb_buf, - buf->lb_len); + rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db, + buf->lb_buf, buf->lb_len); up(&obj->oo_guard); RETURN(rc); @@ -2088,6 +2097,41 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env, RETURN(oc); } +static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) +{ + ENTRY; + + /* XXX: not implemented yet, important for COS */ + + RETURN(0); +} + +static dt_obj_version_t osd_object_version_get(const struct lu_env *env, + struct dt_object *dt) +{ + /* XXX: not implemented yet, important for VBR */ + return 0; +} + +/* + * Set the 64-bit version and return the old version. + */ +static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, + dt_obj_version_t new_version) +{ + /* XXX: not implemented yet, important for VBR */ + return; +} + +static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data) +{ + ENTRY; + + /* XXX: not implemented yet, important for quota */ + *data = NULL; + RETURN(0); +} + static struct dt_object_operations osd_obj_ops = { .do_read_lock = osd_object_read_lock, .do_write_lock = osd_object_write_lock, @@ -2113,6 +2157,10 @@ static struct dt_object_operations osd_obj_ops = { .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, + .do_data_get = osd_data_get, }; /* @@ -2455,6 +2503,8 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d, static int osd_shutdown(const struct lu_env *env, struct osd_device *o) { ENTRY; + if (o->od_dt_dev.dd_lu_dev.ld_site) + lu_site_purge(env, o->od_dt_dev.dd_lu_dev.ld_site, ~0); if (o->od_objdir_db) udmu_object_put_dmu_buf(o->od_objdir_db, objdir_tag); @@ -2675,7 +2725,16 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev, struct lu_device *dev) { - RETURN(0); + int rc = 0; + ENTRY; + + if (lu_device_is_md(pdev)) { + /* 2. setup local objects */ + rc = llo_local_objects_setup(env, lu2md_dev(pdev), + lu2dt_dev(dev)); + } + + RETURN(rc); } /* -- 1.8.3.1