struct osd_thandle *oh;
ENTRY;
+ if (!dt_object_exists(dt)) {
+ /* XXX: sanity check that object creation is declared */
+ RETURN(0);
+ }
+
LASSERT(handle != NULL);
- LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
oh = container_of0(handle, struct osd_thandle, ot_super);
oh = container_of0(handle, struct osd_thandle, ot_super);
LASSERT(oh != NULL);
LASSERT(oh->ot_tx != NULL);
+ LASSERT(dof);
switch (dof->dof_type) {
case DFT_DIR:
+ case DFT_INDEX:
/* for zap create */
udmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
break;
case DFT_REGULAR:
case DFT_SYM:
case DFT_NODE:
- case DFT_INDEX:
/* first, we'll create new object */
udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
break;
struct lu_attr *attr,
struct osd_thandle *oh)
{
- dmu_buf_t * db;
+ dmu_buf_t *db;
- LASSERT(S_ISDIR(attr->la_mode));
+ /* XXX: LASSERT(S_ISDIR(attr->la_mode)); */
udmu_zap_create(&osd->od_objset, &db, oh->ot_tx, osd_object_tag);
return db;
db = osd_create_type_f(dof->dof_type)(info, osd, attr, oh);
if(IS_ERR(db))
- RETURN (PTR_ERR(th));
+ RETURN(PTR_ERR(th));
oid = udmu_object_get_id(db);
/* XXX: zapdb should be replaced with zap-mapping-fids-to-dnode */
rc = udmu_zap_insert(&osd->od_objset, zapdb, oh->ot_tx, buf,
&oid, sizeof (oid));
- if(rc)
- goto out;
+ if (rc)
+ GOTO(out, rc);
obj->oo_db = db;
udmu_object_getattr(db, &vap);
vnattr2lu_attr(&vap, attr);
- CDEBUG(D_OTHER, "create object %s oid["LPD64"] (objid %llu)\n",
- buf, oid, vap.va_nodeid);
+ CDEBUG(D_OTHER, "create object "DFID" (dnode %llu)\n",
+ PFID(fid), oid);
rc = osd_create_post(info, obj, attr, th);
zap_cursor_t *ozi_zc;
struct osd_object *ozi_obj;
struct lustre_capa *ozi_capa;
- char ozi_name[NAME_MAX+1];
+ char ozi_name[NAME_MAX + 1];
char ozi_rec[IT_REC_SIZE];
};
static struct dt_it *osd_zap_it_init(const struct lu_env *env,
- struct dt_object *dt,
- struct lustre_capa *capa)
+ struct dt_object *dt,
+ struct lustre_capa *capa)
{
struct osd_zap_it *it;
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct lu_object *lo = &dt->do_lu;
-
ENTRY;
+
+ /* XXX: check capa ? */
+
LASSERT(lu_object_exists(lo));
LASSERT(obj->oo_db);
LASSERT(udmu_object_is_zap(obj->oo_db));
static void osd_zap_it_fini(const struct lu_env *env, struct dt_it *di)
{
- struct osd_zap_it *it = (struct osd_zap_it *)di;
- struct osd_object *obj = it->ozi_obj;
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ struct osd_object *obj;
+ ENTRY;
+
+ LASSERT(it);
+ LASSERT(it->ozi_obj);
+
+ obj = it->ozi_obj;
udmu_zap_cursor_fini(it->ozi_zc);
lu_object_put(env, &obj->oo_dt.do_lu);
OBD_FREE_PTR(it);
+ EXIT;
}
+/**
+ * Move Iterator to record specified by \a key
+ *
+ * \param di osd iterator
+ * \param key key for index
+ *
+ * \retval +ve di points to record with least key not larger than key
+ * \retval 0 di points to exact matched key
+ * \retval -ve failure
+ */
+
static int osd_zap_it_get(const struct lu_env *env,
- struct dt_it *di, const struct dt_key *key)
+ struct dt_it *di, const struct dt_key *key)
{
- int rc;
- struct osd_zap_it *it = (struct osd_zap_it *)di;
-
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ int rc;
ENTRY;
+
+ LASSERT(it);
+ LASSERT(it->ozi_zc);
+
rc = udmu_zap_cursor_move_to_key(it->ozi_zc, (const char *) key);
if (rc == 0) /* if record exist return +1 */
RETURN(1);
static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
{
struct osd_zap_it *it = (struct osd_zap_it *)di;
- //int rc;
-
+ int rc;
ENTRY;
+
udmu_zap_cursor_advance(it->ozi_zc);
/* According to current API we need to return error if its last entry.
* We shld make changes to Iterator API to not return status for this API
* */
- /* XXX: not implemented yet */
- RETURN(0);
- LBUG();
-#if 0
- rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NAME_MAX);
+ rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
if (rc == ENOENT) /* end of dir*/
RETURN(+1);
RETURN((-rc));
-#endif
}
-#if 0
-static int osd_zap_it_del(const struct lu_env *env, struct dt_it *di,
- struct thandle *th)
-{
- /* PBS: not called from anywhere , shld be removed from Iterator APIs
- * */
- LBUG();
-
- RETURN(0);
-}
-#endif
-
static struct dt_key *osd_zap_it_key(const struct lu_env *env,
const struct dt_it *di)
{
- //struct osd_zap_it *it = (struct osd_zap_it *)di;
- //int rc = 0;
-
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ int rc = 0;
ENTRY;
- /* XXX: not impelemented yet */
- LBUG();
- RETURN(NULL);
-#if 0
- rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
+
+ rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX + 1);
if (!rc)
RETURN((struct dt_key *)it->ozi_name);
else
RETURN(ERR_PTR(-rc));
-#endif
}
static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di)
{
- //struct osd_zap_it *it = (struct osd_zap_it *)di;
- //int rc = 0;
-
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ int rc;
ENTRY;
- /* XXX: not implemented yet */
- LBUG();
- RETURN(0);
-#if 0
+
rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
if (!rc)
RETURN(strlen(it->ozi_name));
else
RETURN(-rc);
-#endif
}
static int osd_zap_it_load(const struct lu_env *env,
const struct dt_it *di, __u64 hash)
{
- //struct osd_zap_it *it = (struct osd_zap_it *)di;
- //struct osd_object *obj = it->ozi_obj;
- //int rc;
-
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ struct osd_object *obj = it->ozi_obj;
+ int rc;
ENTRY;
- /* XXX: not implemented yet */
- LBUG();
- RETURN(0);
-#if 0
+
udmu_zap_cursor_init_serialized(it->ozi_zc, &osd_obj2dev(obj)->od_objset,
udmu_object_get_id(obj->oo_db), hash);
/* same as osd_zap_it_next()*/
- rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
+ rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX + 1);
if (rc == 0)
RETURN(+1);
+
if (rc == ENOENT) /* end of dir*/
RETURN(0);
RETURN(-rc);
-#endif
}
static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
struct lu_fid_pack *pack;
struct lu_fid *fid;
dmu_buf_t *zapdb = obj->oo_db;
- //dmu_buf_t *db;
uint64_t oid;
int rc;
ENTRY;
LASSERT(udmu_object_is_zap(obj->oo_db));
- if (osd_object_is_root(obj)) {
+ /* XXX: to decide on format of / yet */
+ if (0 && osd_object_is_root(obj)) {
rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key, &oid,
sizeof(uint64_t), sizeof(uint64_t));
if (rc) {
rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key,
rec, 17, 1);
}
- RETURN(-rc);
+
+ RETURN(rc == 0 ? 1 : -rc);
}
static int osd_declare_index_insert(const struct lu_env *env,
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(obj->oo_db != NULL);
+ /*
+ * XXX: implement support for fixed-size keys sorted with natural
+ * numerical way (not using internal hash value)
+ */
if (udmu_object_is_zap(obj->oo_db))
dt->do_index_ops = &osd_index_ops;
return 0;
struct lu_buf *buf, const char *name,
struct lustre_capa *capa)
{
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
- int rc, size;
-
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
+ int rc, size;
ENTRY;
+
LASSERT(obj->oo_db != NULL);
LASSERT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
buf->lb_len, name, &size);
up(&obj->oo_guard);
- if(rc == -ENOENT)
+ if (rc == -ENOENT)
rc = -ENODATA;
- if(rc == 0)
+ if (rc == 0)
rc = size;
RETURN(rc);
}
ENTRY;
LASSERT(handle != NULL);
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_tx != NULL);
+
+ if (!dt_object_exists(dt)) {
+ udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
+ udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
+ RETURN(0);
+ }
+
LASSERT(obj->oo_db != NULL);
LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
- oh = container_of0(handle, struct osd_thandle, ot_super);
- LASSERT(oh->ot_tx != NULL);
down(&obj->oo_guard);
- udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen, name,
- oh->ot_tx);
+ udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen,
+ name, oh->ot_tx);
up(&obj->oo_guard);
RETURN(0);
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
- int rc;
-
+ int rc = 0;
ENTRY;
+
LASSERT(handle != NULL);
LASSERT(obj->oo_db != NULL);
LASSERT(osd_invariant(obj));
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
- int rc;
-
+ int rc;
ENTRY;
+
LASSERT(handle != NULL);
LASSERT(obj->oo_db != NULL);
LASSERT(osd_invariant(obj));
struct dt_object *dt, struct lu_buf *buf,
struct lustre_capa *capa)
{
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
- int rc;
-
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
+ int rc;
ENTRY;
+
LASSERT(obj->oo_db != NULL);
LASSERT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
down(&obj->oo_guard);
- rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db, buf->lb_buf,
- buf->lb_len);
+ rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db,
+ buf->lb_buf, buf->lb_len);
up(&obj->oo_guard);
RETURN(rc);
RETURN(oc);
}
+static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
+{
+ ENTRY;
+
+ /* XXX: not implemented yet, important for COS */
+
+ RETURN(0);
+}
+
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ /* XXX: not implemented yet, important for VBR */
+ return 0;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+ dt_obj_version_t new_version)
+{
+ /* XXX: not implemented yet, important for VBR */
+ return;
+}
+
+static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data)
+{
+ ENTRY;
+
+ /* XXX: not implemented yet, important for quota */
+ *data = NULL;
+ RETURN(0);
+}
+
static struct dt_object_operations osd_obj_ops = {
.do_read_lock = osd_object_read_lock,
.do_write_lock = osd_object_write_lock,
.do_xattr_del = osd_xattr_del,
.do_xattr_list = osd_xattr_list,
.do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
+ .do_data_get = osd_data_get,
};
/*
static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
{
ENTRY;
+ if (o->od_dt_dev.dd_lu_dev.ld_site)
+ lu_site_purge(env, o->od_dt_dev.dd_lu_dev.ld_site, ~0);
if (o->od_objdir_db)
udmu_object_put_dmu_buf(o->od_objdir_db, objdir_tag);
struct lu_device *pdev,
struct lu_device *dev)
{
- RETURN(0);
+ int rc = 0;
+ ENTRY;
+
+ if (lu_device_is_md(pdev)) {
+ /* 2. setup local objects */
+ rc = llo_local_objects_setup(env, lu2md_dev(pdev),
+ lu2dt_dev(dev));
+ }
+
+ RETURN(rc);
}
/*