Whamcloud - gitweb
On a server, a file system object is uniquely identified by a fid, which is
authornikita <nikita>
Sat, 18 Oct 2008 17:14:45 +0000 (17:14 +0000)
committernikita <nikita>
Sat, 18 Oct 2008 17:14:45 +0000 (17:14 +0000)
sufficient to locate and load all object state (inode). On a client, on the
other hand, more data are necessary instantiate an object. Change
lu_object_find() and friends to take additional `lu_conf' argument describing
object.  Typically this includes layout information.
b=16450

14 files changed:
lustre/ChangeLog
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/cmm/mdc_device.c
lustre/cmm/mdc_object.c
lustre/fid/fid_handler.c
lustre/fld/fld_handler.c
lustre/include/lu_object.h
lustre/include/md_object.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/obdclass/dt_object.c
lustre/obdclass/lu_object.c
lustre/osd/osd_handler.c

index ba62a48..e6046ad 100644 (file)
@@ -1591,6 +1591,16 @@ Bugzilla   : 16450
 Description: Constify instances of struct lsm_operations.
 Details    : Constify instances of struct lsm_operations.
 
+Severity   : normal
+Bugzilla   : 16450
+Description: lu_conf support.
+Details    : On a server, a file system object is uniquely identified
+            by a fid, which is sufficient to locate and load all object
+            state (inode). On a client, on the other hand, more data are
+            necessary instantiate an object. Change lu_object_find() and
+            friends to take additional `lu_conf' argument describing object.
+            Typically this includes layout information.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 0d9ac93..910e5c0 100644 (file)
@@ -159,7 +159,8 @@ static void cml_object_free(const struct lu_env *env,
         OBD_FREE_PTR(clo);
 }
 
-static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
+static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
+                           const struct lu_object_conf *_)
 {
         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
         struct lu_device  *c_dev;
@@ -503,30 +504,11 @@ static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
-/* rename is split to local/remote by location of new parent dir */
-struct md_object *md_object_find(const struct lu_env *env,
-                                 struct md_device *md,
-                                 const struct lu_fid *f)
-{
-        struct lu_object *o;
-        struct md_object *m;
-        ENTRY;
-
-        o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
-        if (IS_ERR(o))
-                m = (struct md_object *)o;
-        else {
-                o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
-                m = o ? lu2md(o) : NULL;
-        }
-        RETURN(m);
-}
-
 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
                         const struct lu_fid *lf, struct md_attr *ma,
                         int *remote)
 {
-        struct md_object *mo_s = md_object_find(env, md, lf);
+        struct md_object *mo_s = md_object_find_slice(env, md, lf);
         struct cmm_thread_info *cmi;
         struct md_attr *tmp_ma;
         int rc;
@@ -558,7 +540,7 @@ static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
                             const struct lu_fid *lf, struct md_attr *ma)
 {
-        struct md_object *mo_s = md_object_find(env, md, lf);
+        struct md_object *mo_s = md_object_find_slice(env, md, lf);
         int rc;
         ENTRY;
 
@@ -619,7 +601,7 @@ static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
                 /* XXX: mo_t is remote object and there is RPC to unlink it.
                  * before that, do local sanity check for rename first. */
                 if (!remote) {
-                        struct md_object *mo_s = md_object_find(env,
+                        struct md_object *mo_s = md_object_find_slice(env,
                                                         md_obj2dev(mo_po), lf);
                         if (IS_ERR(mo_s))
                                 RETURN(PTR_ERR(mo_s));
@@ -815,7 +797,8 @@ static void cmr_object_free(const struct lu_env *env,
         OBD_FREE_PTR(cro);
 }
 
-static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
+static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
+                           const struct lu_object_conf *_)
 {
         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
         struct lu_device  *c_dev;
index 1d105fb..361b38d 100644 (file)
@@ -241,17 +241,7 @@ struct cmm_object *cmm_object_find(const struct lu_env *env,
                                    struct cmm_device *d,
                                    const struct lu_fid *f)
 {
-        struct lu_object *o;
-        struct cmm_object *m;
-        ENTRY;
-
-        o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
-        if (IS_ERR(o))
-                m = (struct cmm_object *)o;
-        else
-                m = lu2cmm_obj(lu_object_locate(o->lo_header,
-                               d->cmm_md_dev.md_lu_dev.ld_type));
-        RETURN(m);
+        return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
 }
 
 static inline void cmm_object_put(const struct lu_env *env,
index d5518f6..568d608 100644 (file)
@@ -104,7 +104,7 @@ static int mdc_obd_add(const struct lu_env *env,
         const char *uuid_str = lustre_cfg_string(cfg, 1);
         const char *index = lustre_cfg_string(cfg, 2);
         const char *mdc_uuid_str = lustre_cfg_string(cfg, 4);
-        struct lu_site *ls = mdc2lu_dev(mc)->ld_site;
+        struct md_site *ms = lu_site2md(mdc2lu_dev(mc)->ld_site);
         char *p;
         int rc = 0;
 
@@ -162,7 +162,7 @@ static int mdc_obd_add(const struct lu_env *env,
                         desc->cl_exp = class_conn2export(conn);
                         /* set seq controller export for MDC0 if exists */
                         if (mc->mc_num == 0)
-                                ls->ls_control_exp = 
+                                ms->ms_control_exp =
                                         class_export_get(desc->cl_exp);
                         rc = obd_fid_init(desc->cl_exp);
                         if (rc)
index 70a1c3e..d58e47b 100644 (file)
@@ -86,23 +86,17 @@ static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
         OBD_FREE_PTR(mco);
 }
 
-static int mdc_object_init(const struct lu_env *env, struct lu_object *lo)
+static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
+                           const struct lu_object_conf *_)
 {
         ENTRY;
         lo->lo_header->loh_attr |= LOHA_REMOTE;
         RETURN(0);
 }
 
-static int mdc_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *lo)
-{
-       return (*p)(env, cookie, LUSTRE_CMM_MDC_NAME"-object@%p", lo);
-}
-
-static struct lu_object_operations mdc_obj_ops = {
+static const struct lu_object_operations mdc_obj_ops = {
         .loo_object_init    = mdc_object_init,
         .loo_object_free    = mdc_object_free,
-       .loo_object_print   = mdc_object_print,
 };
 
 /* md_object_operations */
index 4725884..d52b770 100644 (file)
@@ -302,25 +302,27 @@ static int seq_server_handle(struct lu_site *site,
                              struct lu_range *out)
 {
         int rc;
+        struct md_site *mite;
         ENTRY;
 
+        mite = lu_site2md(site);
         switch (opc) {
         case SEQ_ALLOC_META:
-                if (!site->ls_server_seq) {
+                if (!mite->ms_server_seq) {
                         CERROR("Sequence server is not "
                                "initialized\n");
                         RETURN(-EINVAL);
                 }
-                rc = seq_server_alloc_meta(site->ls_server_seq,
+                rc = seq_server_alloc_meta(mite->ms_server_seq,
                                            in, out, env);
                 break;
         case SEQ_ALLOC_SUPER:
-                if (!site->ls_control_seq) {
+                if (!mite->ms_control_seq) {
                         CERROR("Sequence controller is not "
                                "initialized\n");
                         RETURN(-EINVAL);
                 }
-                rc = seq_server_alloc_super(site->ls_control_seq,
+                rc = seq_server_alloc_super(mite->ms_control_seq,
                                             in, out, env);
                 break;
         default:
index 420ad42..3138a54 100644 (file)
@@ -229,7 +229,7 @@ static int fld_req_handle(struct ptlrpc_request *req,
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
-                rc = fld_server_handle(site->ls_server_fld,
+                rc = fld_server_handle(lu_site2md(site)->ms_server_fld,
                                        req->rq_svc_thread->t_env,
                                        *opc, out, info);
         } else
@@ -293,16 +293,18 @@ EXPORT_SYMBOL(fld_query);
 int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
 {
         int result;
+        struct md_site *msite;
 
         result = 1; /* conservatively assume fid is local */
-        if (site->ls_client_fld != NULL) {
+        msite = lu_site2md(site);
+        if (msite->ms_client_fld != NULL) {
                 mdsno_t mds;
                 int rc;
 
-                rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
+                rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
                                       fid_seq(fid), &mds);
                 if (rc == 0)
-                        result = (mds == site->ls_node_id);
+                        result = (mds == msite->ms_node_id);
         }
         return result;
 }
index 293cc28..26ab509 100644 (file)
@@ -159,6 +159,14 @@ struct lu_device_operations {
 };
 
 /**
+ * Object configuration, describing particulars of object being created. On
+ * server this is not used, as server objects are full identified by fid. On
+ * client configuration contains struct lustre_md.
+ */
+struct lu_object_conf {
+};
+
+/**
  * Type of "printer" function used by lu_object_operations::loo_object_print()
  * method.
  *
@@ -710,37 +718,31 @@ static inline int lu_object_is_dying(const struct lu_object_header *h)
         return test_bit(LU_OBJECT_HEARD_BANSHEE, &h->loh_flags);
 }
 
-/*
- * Decrease reference counter on object. If last reference is freed, return
- * object to the cache, unless lu_object_is_dying(o) holds. In the latter
- * case, free object immediately.
- */
-void lu_object_put(const struct lu_env *env,
-                   struct lu_object *o);
+void lu_object_put(const struct lu_env *env, struct lu_object *o);
 
-/*
- * Free @nr objects from the cold end of the site LRU list.
- */
 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr);
 
-/*
- * Print all objects in @s.
- */
 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
                    lu_printer_t printer);
-/*
- * Search cache for an object with the fid @f. If such object is found, return
- * it. Otherwise, create new object, insert it into cache and return it. In
- * any case, additional reference is acquired on the returned object.
- */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f);
-
-/*
+                                 struct lu_device *dev, const struct lu_fid *f,
+                                 const struct lu_object_conf *conf);
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                    struct lu_device *dev,
+                                    const struct lu_fid *f,
+                                    const struct lu_object_conf *conf);
+struct lu_object *lu_object_find_slice(const struct lu_env *env,
+                                       struct lu_device *dev,
+                                       const struct lu_fid *f,
+                                       const struct lu_object_conf *conf);
+/** @} caching */
+
+/** \name helpers
  * Helpers.
+ * @{
  */
 
-/*
+/**
  * First (topmost) sub-object of given compound object
  */
 static inline struct lu_object *lu_object_top(struct lu_object_header *h)
index 4546089..f6166e1 100644 (file)
@@ -425,7 +425,15 @@ static inline void md_device_fini(struct md_device *md)
        lu_device_fini(&md->md_lu_dev);
 }
 
-/* md operations */
+static inline struct md_object *md_object_find_slice(const struct lu_env *env,
+                                                     struct md_device *md,
+                                                     const struct lu_fid *f)
+{
+        return lu2md(lu_object_find_slice(env, md2lu_dev(md), f, NULL));
+}
+
+
+/** md operations */
 static inline int mo_permission(const struct lu_env *env,
                                 struct md_object *p,
                                 struct md_object *c,
index f057b2e..4a50879 100644 (file)
@@ -175,7 +175,8 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env,
         }
 }
 
-static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
+static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
+                           const struct lu_object_conf *_)
 {
        struct mdd_device *d = lu2mdd_dev(o->lo_dev);
        struct lu_object  *below;
@@ -208,15 +209,8 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
         OBD_FREE_PTR(mdd);
 }
 
-static int mdd_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *o)
-{
-        return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
-}
-
 /* orphan handling is here */
-static void mdd_object_delete(const struct lu_env *env,
-                               struct lu_object *o)
+static void mdd_object_delete(const struct lu_env *env, struct lu_object *o)
 {
         struct mdd_object *mdd_obj = lu2mdd_obj(o);
         struct thandle *handle = NULL;
@@ -254,21 +248,7 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
                                    struct mdd_device *d,
                                    const struct lu_fid *f)
 {
-        struct lu_object *o, *lo;
-        struct mdd_object *m;
-        ENTRY;
-
-        o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
-        if (IS_ERR(o))
-                m = (struct mdd_object *)o;
-        else {
-                lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
-                /* remote object can't be located and should be put then */
-                if (lo == NULL)
-                        lu_object_put(env, o);
-                m = lu2mdd_obj(lo);
-        }
-        RETURN(m);
+        return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
 }
 
 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
index 366355d..3667afa 100644 (file)
@@ -1768,7 +1768,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         ENTRY;
 
         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
-        o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
+        o = lu_object_find(env, &d->mdt_md_dev.md_lu_dev, f, NULL);
         if (unlikely(IS_ERR(o)))
                 m = (struct mdt_object *)o;
         else
@@ -3681,7 +3681,7 @@ static void mdt_stack_fini(const struct lu_env *env,
         m->mdt_bottom = NULL;
 }
 
-static struct lu_device *mdt_layer_setup(const struct lu_env *env,
+static struct lu_device *mdt_layer_setup(struct lu_env *env,
                                          const char *typename,
                                          struct lu_device *child,
                                          struct lustre_cfg *cfg)
@@ -3749,7 +3749,7 @@ out:
         return ERR_PTR(rc);
 }
 
-static int mdt_stack_init(const struct lu_env *env,
+static int mdt_stack_init(struct lu_env *env,
                           struct mdt_device *m, struct lustre_cfg *cfg)
 {
         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
@@ -3991,7 +3991,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         }
 
         /* init the stack */
-        rc = mdt_stack_init(env, m, cfg);
+        rc = mdt_stack_init((struct lu_env *)env, m, cfg);
         if (rc) {
                 CERROR("Can't init device stack, rc %d\n", rc);
                 GOTO(err_fini_proc, rc);
@@ -4192,7 +4192,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 RETURN(NULL);
 }
 
-static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
+static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
+                           const struct lu_object_conf *_)
 {
         struct mdt_device *d = mdt_dev(o->lo_dev);
         struct lu_device  *under;
index c429bd2..2a99005 100644 (file)
@@ -185,7 +185,7 @@ static struct dt_object *dt_locate(const struct lu_env *env,
         struct lu_object *obj;
         struct dt_object *dt;
 
-        obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid);
+        obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
         if (!IS_ERR(obj)) {
                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
                 LASSERT(obj != NULL);
index f7efd24..f8b7ea7 100644 (file)
@@ -123,15 +123,16 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
 }
 EXPORT_SYMBOL(lu_object_put);
 
-/*
+/**
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
  * struct lu_device_operations definition.
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
-                                         struct lu_site *s,
-                                         const struct lu_fid *f)
+                                         struct lu_device *dev,
+                                         const struct lu_fid *f,
+                                         const struct lu_object_conf *conf)
 {
         struct lu_object *scan;
         struct lu_object *top;
@@ -144,8 +145,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
          * Create top-level object slice. This will also create
          * lu_object_header.
          */
-        top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
-                                                      NULL, s->ls_top_dev);
+        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
         if (top == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
         /*
@@ -166,7 +166,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                 continue;
                         clean = 0;
                         scan->lo_header = top->lo_header;
-                        result = scan->lo_ops->loo_object_init(env, scan);
+                        result = scan->lo_ops->loo_object_init(env, scan, conf);
                         if (result != 0) {
                                 lu_object_free(env, top);
                                 RETURN(ERR_PTR(result));
index 0269a07..8fedb2c 100644 (file)
@@ -117,7 +117,8 @@ static int   osd_mod_init      (void) __init;
 static int   osd_type_init     (struct lu_device_type *t);
 static void  osd_type_fini     (struct lu_device_type *t);
 static int   osd_object_init   (const struct lu_env *env,
-                                struct lu_object *l);
+                                struct lu_object *l,
+                                const struct lu_object_conf *_);
 static void  osd_object_release(const struct lu_env *env,
                                 struct lu_object *l);
 static int   osd_object_print  (const struct lu_env *env, void *cookie,
@@ -337,7 +338,8 @@ static void osd_object_init0(struct osd_object *obj)
  * Concurrency: no concurrent access is possible that early in object
  * life-cycle.
  */
-static int osd_object_init(const struct lu_env *env, struct lu_object *l)
+static int osd_object_init(const struct lu_env *env, struct lu_object *l,
+                           const struct lu_object_conf *_)
 {
         struct osd_object *obj = osd_obj(l);
         int result;
@@ -2133,7 +2135,7 @@ static int osd_index_compat_insert(const struct lu_env *env,
         if (result != 0)
                 return result;
 
-        luch = lu_object_find(env, ludev->ld_site, fid);
+        luch = lu_object_find(env, ludev, fid, NULL);
         if (!IS_ERR(luch)) {
                 if (lu_object_exists(luch)) {
                         struct osd_object *child;