Whamcloud - gitweb
add a branch and its table named "b_iam" for "Index API Module" from #colibri
[fs/lustre-release.git] / lustre / cmobd / cm_obd.c
index 15bbb36..417ef9e 100644 (file)
@@ -32,6 +32,7 @@
 #include <linux/lustre_cmobd.h>
 #include <linux/lprocfs_status.h>
 #include <linux/obd_lov.h>
+#include <linux/obd_ost.h>
 #include <linux/obd_lmv.h>
 
 #include "cm_internal.h"
@@ -50,64 +51,68 @@ static int cmobd_detach(struct obd_device *obd)
         return lprocfs_obd_detach(obd);
 }
 
-static inline int cmobd_lmv_obd(struct obd_device *obd)
+static int cmobd_init_dt_desc(struct obd_device *obd)
 {
-        if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) ||
-            !strcmp(obd->obd_type->typ_name, OBD_LMV_DEVICENAME))
-                return 1;
-
-        return 0;
-}
-
-static inline int cmobd_lov_obd(struct obd_device *obd)
-{
-        if (!strcmp(obd->obd_type->typ_name, OBD_LOV_DEVICENAME))
-                return 1;
-
-        return 0;
+        struct cm_obd *cmobd = &obd->u.cm;
+        __u32 valsize;
+        int rc = 0;
+        ENTRY;
+        
+        /* as CMOBD is stand alone device, that is has not to be connected, we
+         * have no other way to init EAs correctly but ask master device about
+         * it. Thus both, DT and MD layers should be able to answer with correct
+         * lov_desc. LOV knows it explicitly and LMV/MDC have to ask MDS server
+         * of it. */
+        valsize = sizeof(cmobd->master_desc);
+        memset(&cmobd->master_desc, 0, sizeof(cmobd->master_desc));
+
+        rc = obd_get_info(cmobd->master_exp, strlen("lovdesc") + 1,
+                          "lovdesc", &valsize, &cmobd->master_desc);
+        RETURN(rc);
 }
-
-static void cmobd_init_ea_size(struct obd_device *obd)
+        
+static int cmobd_init_ea_size(struct obd_device *obd)
 {
-        int ost_count = 1, easize, cookiesize;
+        int rc = 0, tgt_count, easize, cookiesize;
         struct cm_obd *cmobd = &obd->u.cm;
         ENTRY;
 
-        /*
-         * here we should also take into account that there is possible to have
-         * few OSTs. --umka
-         */
-        easize = lov_mds_md_size(ost_count);
-        cookiesize = ost_count * sizeof(struct llog_cookie);
+        if (!cmobd->master_exp)
+                RETURN(-EINVAL);
 
-        obd_init_ea_size(cmobd->master_exp, easize, cookiesize);
+        tgt_count = cmobd->master_desc.ld_tgt_count;
 
-        cmobd->master_exp->exp_obd->u.cli.cl_max_mds_easize = easize;
-        cmobd->master_exp->exp_obd->u.cli.cl_max_mds_cookiesize = cookiesize;
-        
-        EXIT;
+        /* no EA setup is needed as there is single OST with no LOV */
+        if (tgt_count == 0)
+                RETURN(0);
+
+        easize = lov_mds_md_size(tgt_count);
+        cookiesize = tgt_count * sizeof(struct llog_cookie);
+        rc = obd_init_ea_size(cmobd->master_exp, easize, cookiesize);
+        RETURN(rc);
 }
 
+static char *types[] = {
+        OBD_LMV_DEVICENAME, OBD_MDC_DEVICENAME,
+        OBD_LOV_DEVICENAME, OBD_OSC_DEVICENAME
+};
+
 static struct obd_device *
-find_master_obd(struct obd_device *obd, struct obd_uuid *uuid)
+cmobd_find_obd(struct obd_device *obd, struct obd_uuid *uuid)
 {
-        struct obd_device *master;
+        struct obd_device *res;
+        int i = 0;
+        ENTRY;
 
         CWARN("%s: looking for client obd %s\n",
               obd->obd_uuid.uuid, uuid->uuid);
 
-        master = class_find_client_obd(NULL, OBD_LOV_DEVICENAME,
-                                       uuid);
-        if (master)
-                return master;
-
-        master = class_find_client_obd(NULL, OBD_LMV_DEVICENAME,
-                                       uuid);
-        if (master)
-                return master;
-
-        return class_find_client_obd(NULL, LUSTRE_MDC_NAME,
-                                     uuid);
+        for (i = 0; i < sizeof(types) / sizeof(char *); i++) {
+                res = class_find_client_obd(NULL, types[i], uuid);
+                if (res)
+                        RETURN(res);
+        }
+        RETURN(NULL);
 }
 
 static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
@@ -116,8 +121,6 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
         struct lustre_handle conn = { 0 };
         struct cm_obd *cmobd = &obd->u.cm;
         struct lustre_cfg* lcfg = buf;
-        struct lustre_id mid, lid;
-        __u32 valsize;
         int rc;
         ENTRY;
 
@@ -137,9 +140,9 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
         obd_str2uuid(&cache_uuid, lustre_cfg_string(lcfg, 2));
 
         /* getting master obd */
-        cmobd->master_obd = find_master_obd(obd, &master_uuid);
+        cmobd->master_obd = cmobd_find_obd(obd, &master_uuid);
         if (!cmobd->master_obd) {
-                CERROR("can't find master obd by uuid %s\n",
+                CERROR("can't find master client obd by uuid %s\n",
                        master_uuid.uuid);
                 RETURN(-EINVAL);
         }
@@ -166,9 +169,19 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(put_master, rc);
         cmobd->cache_exp = class_conn2export(&conn);
-        
-        if (cmobd_lov_obd(cmobd->master_exp->exp_obd)) {
-                /* for master osc remove the recovery flag. */
+
+        /* initialing DT desc. Both, data and metadata layers should be able to
+         * serve this call. */
+        rc = cmobd_init_dt_desc(obd);
+        if (rc != 0 && rc != -EPROTO) {
+                CERROR("cannot get DT layer desc from master device %s, "
+                       "err %d.\n", cmobd->master_exp->exp_obd->obd_name,
+                       rc);
+                GOTO(put_cache, rc);
+        }
+
+        if (obd_dt_type(cmobd->master_exp->exp_obd)) {
+                /* for master dt device remove the recovery flag. */
                 rc = obd_set_info(cmobd->master_exp, strlen("unrecovery"),
                                   "unrecovery", 0, NULL); 
                 if (rc)
@@ -177,66 +190,45 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
                 rc = cmobd_init_write_srv(obd);
                 if (rc)
                         GOTO(put_cache, rc);
-        } else {
-                cmobd_init_ea_size(obd);
-                cmobd->write_srv = NULL;
         }
 
-        if (cmobd_lmv_obd(cmobd->master_exp->exp_obd)) {
-                /*
-                 * making sure, that both obds are ready. This is especially
-                 * important in the case of using LMV as master.
-                 */
-                rc = obd_getready(cmobd->master_exp);
-                if (rc) {
-                        CERROR("can't get %s obd ready.\n",
-                               master_uuid.uuid);
-                        GOTO(put_cache, rc);
-                }
-        
-                rc = obd_getready(cmobd->cache_exp);
-                if (rc) {
-                        CERROR("can't get %s obd ready.\n",
-                               cache_uuid.uuid);
-                        GOTO(put_cache, rc);
-                }
-        
-                /*
-                 * requesting master obd to have its root inode store cookie to
-                 * be able to save it to local root inode EA.
-                 */
-                valsize = sizeof(struct lustre_id);
-        
-                rc = obd_get_info(cmobd->master_exp, strlen("rootid"),
-                                  "rootid", &valsize, &mid);
+        if (obd_md_type(cmobd->master_exp->exp_obd)) {
+                __u32 size = sizeof(struct fid_extent);
+                struct fid_extent ext;
+                
+                rc = cmobd_init_ea_size(obd);
                 if (rc) {
-                        CERROR("can't get rootid from master MDS %s, "
-                               "err= %d.\n", master_uuid.uuid, rc);
+                        CERROR("can't init MD layer EA size, "
+                               "err %d\n", rc);
                         GOTO(put_cache, rc);
                 }
+                cmobd->write_srv = NULL;
 
-                /*
-                 * getting rootid from cache MDS. It is needed to update local
-                 * (cache) root inode by rootid value from master obd.
-                 */
-                rc = obd_get_info(cmobd->cache_exp, strlen("rootid"),
-                                  "rootid", &valsize, &lid);
+                /* getting fid pool from master to set it on cache */
+                rc = obd_get_info(cmobd->master_exp, strlen("getext"),
+                                  "getext", &size, &ext);
                 if (rc) {
-                        CERROR("can't get rootid from local MDS %s, "
-                               "err= %d.\n", cache_uuid.uuid, rc);
+                        CERROR("can't get fids extent from master, "
+                               "err %d\n", rc);
                         GOTO(put_cache, rc);
                 }
 
-                /* storing master MDS rootid to local root inode EA. */
-                CWARN("storing "DLID4" to local inode "DLID4".\n",
-                      OLID4(&mid), OLID4(&lid));
+                /* simple checks for validness */
+                if (!ext.fe_start || !ext.fe_width || ext.fe_start == ext.fe_width) {
+                        CERROR("invalid fids extent from master, ["LPD64"-"LPD64"]\n", 
+                              ext.fe_start, ext.fe_width);
+                        GOTO(put_cache, rc = -EINVAL);
+                }
 
-                rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
-                                    &mid, sizeof(mid));
+                CWARN("setting master fids extent ["LPD64"-"LPD64
+                      "] -> %s\n", ext.fe_start, ext.fe_width,
+                      cmobd->cache_exp->exp_obd->obd_name);
+                
+                rc = obd_set_info(cmobd->cache_exp, strlen("setext"),
+                                  "setext", size, &ext); 
                 if (rc) {
-                        CERROR("can't update local root inode by ID "
-                               "from master MDS %s, err = %d.\n",
-                               master_uuid.uuid, rc);
+                        CERROR("can't set fids extent to cache, "
+                               "err %d\n", rc);
                         GOTO(put_cache, rc);
                 }
         }
@@ -255,19 +247,21 @@ static int cmobd_cleanup(struct obd_device *obd, int flags)
         int rc;
         ENTRY;
 
-        if (cmobd->write_srv)
+        if (cmobd->write_srv) {
                 cmobd_cleanup_write_srv(obd);
+                cmobd->write_srv = NULL;
+        }
 
         rc = obd_disconnect(cmobd->master_exp, flags);
         if (rc) {
-                CERROR("error disconnecting master, err %d\n",
-                       rc);
+                CERROR("error disconnecting master %s, err %d\n",
+                       cmobd->master_exp->exp_obd->obd_name, rc);
         }
         
         rc = class_disconnect(cmobd->cache_exp, flags);
         if (rc) {
-                CERROR("error disconnecting cache, err %d\n",
-                       rc);
+                CERROR("error disconnecting cache %s, err %d\n",
+                       cmobd->cache_exp->exp_obd->obd_name, rc);
         }
         
         RETURN(0);
@@ -281,7 +275,11 @@ static int cmobd_iocontrol(unsigned int cmd, struct obd_export *exp,
         ENTRY;
         
         switch (cmd) {
-        case OBD_IOC_CMOBD_SYNC: /* trigger reintegration */
+        case OBD_IOC_CMOBD_SYNC:
+                /* here would be nice to make sure somehow that all data is in
+                 * cache and there are no outstanding requests, as otherwise
+                 * cache is not coherent. But how to check that from CMOBD? I do
+                 * not know. --umka */
                 rc = cmobd_reintegrate(obd);
                 break;
         default:
@@ -314,14 +312,14 @@ static int __init cmobd_init(void)
 
         lprocfs_init_vars(cmobd, &lvars);
         rc = class_register_type(&cmobd_ops, NULL, lvars.module_vars,
-                                 LUSTRE_CMOBD_NAME);
+                                 OBD_CMOBD_DEVICENAME);
         if (rc)
                 RETURN(rc);
         cmobd_extent_slab = kmem_cache_create("cmobd_extents",
-                                               sizeof(struct cmobd_extent_info), 0,
-                                               SLAB_HWCACHE_ALIGN, NULL, NULL);
+                                              sizeof(struct cmobd_extent_info), 0,
+                                              SLAB_HWCACHE_ALIGN, NULL, NULL);
         if (cmobd_extent_slab == NULL) {
-                class_unregister_type(LUSTRE_CMOBD_NAME);
+                class_unregister_type(OBD_CMOBD_DEVICENAME);
                 RETURN(-ENOMEM);
         }
         RETURN(0);
@@ -329,7 +327,7 @@ static int __init cmobd_init(void)
 
 static void __exit cmobd_exit(void)
 {
-        class_unregister_type(LUSTRE_CMOBD_NAME);
+        class_unregister_type(OBD_CMOBD_DEVICENAME);
         if (kmem_cache_destroy(cmobd_extent_slab) != 0)
                 CERROR("couldn't free cmobd extent slab\n");
 }