Whamcloud - gitweb
- iterators are enabled in dmu osd
authoralex <alex>
Tue, 9 Jun 2009 08:57:15 +0000 (08:57 +0000)
committeralex <alex>
Tue, 9 Jun 2009 08:57:15 +0000 (08:57 +0000)
- osd_shutdown() purges lu object cache
- few empty methods to work with VBR and COS enabled lustre
- osd_prepare() to call MDD to initialize MDS on-disk layout

lustre/dmu-osd/osd_handler.c

index 8c43c6b..a160f5a 100644 (file)
@@ -1014,8 +1014,12 @@ static int osd_declare_attr_set(const struct lu_env *env,
         struct osd_thandle *oh;
         ENTRY;
 
+        if (!dt_object_exists(dt)) {
+                /* XXX: sanity check that object creation is declared */
+                RETURN(0);
+        }
+
         LASSERT(handle != NULL);
-        LASSERT(dt_object_exists(dt));
         LASSERT(osd_invariant(obj));
 
         oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -1157,16 +1161,17 @@ static int osd_declare_object_create(const struct lu_env *env,
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh != NULL);
         LASSERT(oh->ot_tx != NULL);
+        LASSERT(dof);
 
         switch (dof->dof_type) {
                 case DFT_DIR:
+                case DFT_INDEX:
                         /* for zap create */
                         udmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
                         break;
                 case DFT_REGULAR:
                 case DFT_SYM:
                 case DFT_NODE:
-                case DFT_INDEX:
                         /* first, we'll create new object */
                         udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
                         break;
@@ -1190,9 +1195,9 @@ static dmu_buf_t * osd_mkdir(struct osd_thread_info *info, struct osd_device  *o
                      struct lu_attr *attr,
                      struct osd_thandle *oh)
 {
-        dmu_buf_t * db;
+        dmu_buf_t *db;
 
-        LASSERT(S_ISDIR(attr->la_mode));
+        /* XXX: LASSERT(S_ISDIR(attr->la_mode)); */
         udmu_zap_create(&osd->od_objset, &db, oh->ot_tx, osd_object_tag);
 
         return db;
@@ -1313,15 +1318,15 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         db = osd_create_type_f(dof->dof_type)(info, osd, attr, oh);
 
         if(IS_ERR(db))
-                RETURN (PTR_ERR(th));
+                RETURN(PTR_ERR(th));
 
         oid = udmu_object_get_id(db);
 
         /* XXX: zapdb should be replaced with zap-mapping-fids-to-dnode */
         rc = udmu_zap_insert(&osd->od_objset, zapdb, oh->ot_tx, buf,
                              &oid, sizeof (oid));
-        if(rc)
-                goto out;
+        if (rc)
+                GOTO(out, rc);
 
         obj->oo_db = db;
 
@@ -1330,8 +1335,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         udmu_object_getattr(db, &vap);
         vnattr2lu_attr(&vap, attr);
 
-        CDEBUG(D_OTHER, "create object %s oid["LPD64"] (objid %llu)\n",
-               buf, oid, vap.va_nodeid);
+        CDEBUG(D_OTHER, "create object "DFID" (dnode %llu)\n",
+               PFID(fid), oid);
 
         rc = osd_create_post(info, obj, attr, th);
 
@@ -1348,20 +1353,22 @@ struct osd_zap_it {
         zap_cursor_t            *ozi_zc;
         struct osd_object       *ozi_obj;
         struct lustre_capa      *ozi_capa;
-        char                     ozi_name[NAME_MAX+1];
+        char                     ozi_name[NAME_MAX + 1];
         char                     ozi_rec[IT_REC_SIZE];
 };
 
 static struct dt_it *osd_zap_it_init(const struct lu_env *env,
-                struct dt_object *dt,
-                struct lustre_capa *capa)
+                                     struct dt_object *dt,
+                                     struct lustre_capa *capa)
 {
         struct osd_zap_it       *it;
         struct osd_object       *obj = osd_dt_obj(dt);
         struct osd_device       *osd = osd_obj2dev(obj);
         struct lu_object        *lo  = &dt->do_lu;
-
         ENTRY;
+
+        /* XXX: check capa ? */
+
         LASSERT(lu_object_exists(lo));
         LASSERT(obj->oo_db);
         LASSERT(udmu_object_is_zap(obj->oo_db));
@@ -1382,22 +1389,43 @@ static struct dt_it *osd_zap_it_init(const struct lu_env *env,
 
 static void osd_zap_it_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_zap_it     *it = (struct osd_zap_it *)di;
-        struct osd_object *obj = it->ozi_obj;
+        struct osd_zap_it *it = (struct osd_zap_it *)di;
+        struct osd_object *obj;
+        ENTRY;
+
+        LASSERT(it);
+        LASSERT(it->ozi_obj);
+
+        obj = it->ozi_obj;
 
         udmu_zap_cursor_fini(it->ozi_zc);
         lu_object_put(env, &obj->oo_dt.do_lu);
 
         OBD_FREE_PTR(it);
+        EXIT;
 }
 
+/**
+ *  Move Iterator to record specified by \a key
+ *
+ *  \param  di      osd iterator
+ *  \param  key     key for index
+ *
+ *  \retval +ve  di points to record with least key not larger than key
+ *  \retval  0   di points to exact matched key
+ *  \retval -ve  failure
+ */
+
 static int osd_zap_it_get(const struct lu_env *env,
-                struct dt_it *di, const struct dt_key *key)
+                          struct dt_it *di, const struct dt_key *key)
 {
-        int rc;
-        struct osd_zap_it     *it = (struct osd_zap_it *)di;
-
+        struct osd_zap_it *it = (struct osd_zap_it *)di;
+        int                rc;
         ENTRY;
+
+        LASSERT(it);
+        LASSERT(it->ozi_zc);
+
         rc = udmu_zap_cursor_move_to_key(it->ozi_zc, (const char *) key);
         if (rc == 0)   /* if record exist return +1 */
                 RETURN(1);
@@ -1416,9 +1444,9 @@ static void osd_zap_it_put(const struct lu_env *env, struct dt_it *di)
 static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
 {
         struct osd_zap_it *it = (struct osd_zap_it *)di;
-        //int rc;
-
+        int                rc;
         ENTRY;
+
         udmu_zap_cursor_advance(it->ozi_zc);
 
         /* According to current API we need to return error if its last entry.
@@ -1427,65 +1455,38 @@ static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
          * We shld make changes to Iterator API to not return status for this API
          * */
 
-        /* XXX: not implemented yet */
-        RETURN(0);
-        LBUG();
-#if 0
-        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NAME_MAX);
+        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
         if (rc == ENOENT) /* end of dir*/
                 RETURN(+1);
 
         RETURN((-rc));
-#endif
 }
 
-#if 0
-static int osd_zap_it_del(const struct lu_env *env, struct dt_it *di,
-                struct thandle *th)
-{
-        /* PBS: not called from anywhere , shld be removed from Iterator APIs
-         * */
-        LBUG();
-
-        RETURN(0);
-}
-#endif
-
 static struct dt_key *osd_zap_it_key(const struct lu_env *env,
                 const struct dt_it *di)
 {
-        //struct osd_zap_it *it = (struct osd_zap_it *)di;
-        //int rc = 0;
-
+        struct osd_zap_it *it = (struct osd_zap_it *)di;
+        int                rc = 0;
         ENTRY;
-        /* XXX: not impelemented yet */
-        LBUG();
-        RETURN(NULL);
-#if 0
-        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
+
+        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX + 1);
         if (!rc)
                 RETURN((struct dt_key *)it->ozi_name);
         else
                 RETURN(ERR_PTR(-rc));
-#endif
 }
 
 static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di)
 {
-        //struct osd_zap_it *it = (struct osd_zap_it *)di;
-        //int rc = 0;
-
+        struct osd_zap_it *it = (struct osd_zap_it *)di;
+        int                rc;
         ENTRY;
-        /* XXX: not implemented yet */
-        LBUG();
-        RETURN(0);
-#if 0
+
         rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
         if (!rc)
                 RETURN(strlen(it->ozi_name));
         else
                 RETURN(-rc);
-#endif
 }
 
 
@@ -1521,27 +1522,23 @@ static __u64 osd_zap_it_store(const struct lu_env *env, const struct dt_it *di)
 static int osd_zap_it_load(const struct lu_env *env,
                 const struct dt_it *di, __u64 hash)
 {
-        //struct osd_zap_it *it = (struct osd_zap_it *)di;
-        //struct osd_object *obj = it->ozi_obj;
-        //int rc;
-
+        struct osd_zap_it *it = (struct osd_zap_it *)di;
+        struct osd_object *obj = it->ozi_obj;
+        int                rc;
         ENTRY;
-        /* XXX: not implemented yet */
-        LBUG();
-        RETURN(0);
-#if 0
+
         udmu_zap_cursor_init_serialized(it->ozi_zc,  &osd_obj2dev(obj)->od_objset,
                                         udmu_object_get_id(obj->oo_db), hash);
 
         /* same as osd_zap_it_next()*/
-        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
+        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX + 1);
         if (rc == 0)
                 RETURN(+1);
+
         if (rc == ENOENT) /* end of dir*/
                 RETURN(0);
 
         RETURN(-rc);
-#endif
 }
 
 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
@@ -1553,14 +1550,14 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
         struct lu_fid_pack *pack;
         struct lu_fid *fid;
         dmu_buf_t *zapdb = obj->oo_db;
-        //dmu_buf_t *db;
         uint64_t oid;
         int rc;
         ENTRY;
 
         LASSERT(udmu_object_is_zap(obj->oo_db));
 
-        if (osd_object_is_root(obj)) {
+        /* XXX: to decide on format of / yet */
+        if (0 && osd_object_is_root(obj)) {
                 rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key, &oid,
                                      sizeof(uint64_t), sizeof(uint64_t));
                 if (rc) {
@@ -1577,7 +1574,8 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
                 rc = udmu_zap_lookup(&osd->od_objset, zapdb, (char *) key,
                                      rec, 17, 1);
         }
-        RETURN(-rc);
+
+        RETURN(rc == 0 ? 1 : -rc);
 }
 
 static int osd_declare_index_insert(const struct lu_env *env,
@@ -1728,6 +1726,10 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 {
         struct osd_object *obj  = osd_dt_obj(dt);
         LASSERT(obj->oo_db != NULL);
+        /*
+         * XXX: implement support for fixed-size keys sorted with natural
+         *      numerical way (not using internal hash value)
+         */
         if (udmu_object_is_zap(obj->oo_db))
                 dt->do_index_ops = &osd_index_ops;
         return 0;
@@ -1803,11 +1805,11 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_buf *buf, const char *name,
                 struct lustre_capa *capa)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
-        struct osd_device *osd = osd_obj2dev(obj);
-        int rc, size;
-
+        struct osd_object  *obj  = osd_dt_obj(dt);
+        struct osd_device  *osd = osd_obj2dev(obj);
+        int                 rc, size;
         ENTRY;
+
         LASSERT(obj->oo_db != NULL);
         LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
@@ -1817,9 +1819,9 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                              buf->lb_len, name, &size);
         up(&obj->oo_guard);
 
-        if(rc == -ENOENT)
+        if (rc == -ENOENT)
                 rc = -ENODATA;
-        if(rc == 0)
+        if (rc == 0)
                 rc = size;
         RETURN(rc);
 }
@@ -1834,16 +1836,23 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LASSERT(handle != NULL);
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_tx != NULL);
+
+        if (!dt_object_exists(dt)) {
+                udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
+                udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
+                RETURN(0);
+        }
+
         LASSERT(obj->oo_db != NULL);
         LASSERT(dt_object_exists(dt));
         LASSERT(osd_invariant(obj));
 
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_tx != NULL);
 
         down(&obj->oo_guard);
-        udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen, name,
-                               oh->ot_tx);
+        udmu_xattr_declare_set(&osd->od_objset, obj->oo_db, buflen,
+                               name, oh->ot_tx);
         up(&obj->oo_guard);
 
         RETURN(0);
@@ -1857,9 +1866,9 @@ int osd_xattr_set(const struct lu_env *env,
         struct osd_object  *obj = osd_dt_obj(dt);
         struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oh;
-        int rc;
-
+        int rc = 0;
         ENTRY;
+
         LASSERT(handle != NULL);
         LASSERT(obj->oo_db != NULL);
         LASSERT(osd_invariant(obj));
@@ -1906,9 +1915,9 @@ int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
         struct osd_object  *obj = osd_dt_obj(dt);
         struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oh;
-        int rc;
-
+        int                 rc;
         ENTRY;
+
         LASSERT(handle != NULL);
         LASSERT(obj->oo_db != NULL);
         LASSERT(osd_invariant(obj));
@@ -1927,18 +1936,18 @@ int osd_xattr_list(const struct lu_env *env,
                 struct dt_object *dt, struct lu_buf *buf,
                 struct lustre_capa *capa)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
-        struct osd_device *osd = osd_obj2dev(obj);
-        int rc;
-
+        struct osd_object  *obj = osd_dt_obj(dt);
+        struct osd_device  *osd = osd_obj2dev(obj);
+        int                 rc;
         ENTRY;
+
         LASSERT(obj->oo_db != NULL);
         LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
 
         down(&obj->oo_guard);
-        rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db, buf->lb_buf,
-                              buf->lb_len);
+        rc = -udmu_xattr_list(&osd->od_objset, obj->oo_db,
+                              buf->lb_buf, buf->lb_len);
         up(&obj->oo_guard);
 
         RETURN(rc);
@@ -2088,6 +2097,41 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
         RETURN(oc);
 }
 
+static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
+{
+        ENTRY;
+
+        /* XXX: not implemented yet, important for COS */
+
+        RETURN(0);
+}
+
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+                                               struct dt_object *dt)
+{
+        /* XXX: not implemented yet, important for VBR */
+        return 0;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+                                   dt_obj_version_t new_version)
+{
+        /* XXX: not implemented yet, important for VBR */
+        return;
+}
+
+static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data)
+{
+        ENTRY;
+
+        /* XXX: not implemented yet, important for quota */
+        *data = NULL;
+        RETURN(0);
+}
+
 static struct dt_object_operations osd_obj_ops = {
         .do_read_lock         = osd_object_read_lock,
         .do_write_lock        = osd_object_write_lock,
@@ -2113,6 +2157,10 @@ static struct dt_object_operations osd_obj_ops = {
         .do_xattr_del         = osd_xattr_del,
         .do_xattr_list        = osd_xattr_list,
         .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_version_get       = osd_object_version_get,
+        .do_version_set       = osd_object_version_set,
+        .do_data_get          = osd_data_get,
 };
 
 /*
@@ -2455,6 +2503,8 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d,
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
 {
         ENTRY;
+        if (o->od_dt_dev.dd_lu_dev.ld_site)
+                lu_site_purge(env, o->od_dt_dev.dd_lu_dev.ld_site, ~0);
 
         if (o->od_objdir_db)
                 udmu_object_put_dmu_buf(o->od_objdir_db, objdir_tag);
@@ -2675,7 +2725,16 @@ static int osd_prepare(const struct lu_env *env,
                        struct lu_device *pdev,
                        struct lu_device *dev)
 {
-        RETURN(0);
+        int rc = 0;
+        ENTRY;
+
+        if (lu_device_is_md(pdev)) {
+                /* 2. setup local objects */
+                rc = llo_local_objects_setup(env, lu2md_dev(pdev),
+                                             lu2dt_dev(dev));
+        }
+
+        RETURN(rc);
 }
 
 /*