Whamcloud - gitweb
cmm: replace some dynamic allocations with context key values.
authornikita <nikita>
Fri, 3 Nov 2006 12:32:31 +0000 (12:32 +0000)
committernikita <nikita>
Fri, 3 Nov 2006 12:32:31 +0000 (12:32 +0000)
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_split.c

index a7dae08..a199c03 100644 (file)
@@ -109,11 +109,20 @@ struct cmr_object {
         mdsno_t           cmo_num;
 };
 
+enum {
+        CMM_SPLIT_PAGE_COUNT = 1
+};
+
 struct cmm_thread_info {
-        struct md_attr  cmi_ma;
-        struct lu_buf   cmi_buf;
-        struct lu_fid   cmi_fid; /* used for le/cpu convertions */
-        char            cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+        struct md_attr        cmi_ma;
+        struct lu_buf         cmi_buf;
+        struct lu_fid         cmi_fid; /* used for le/cpu conversions */
+        struct lu_rdpg        cmi_rdpg;
+        /* pointers to pages for readpage. */
+        struct page          *cmi_pages[CMM_SPLIT_PAGE_COUNT];
+        struct md_create_spec cmi_spec;
+        struct lmv_stripe_md  cmi_lmv;
+        char                  cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 };
 
 static inline struct cmm_device *cmm_obj2dev(struct cmm_object *c)
index 0ad8acd..9b60ace 100644 (file)
@@ -45,8 +45,6 @@ enum {
         CMM_SPLIT_SIZE =  64 * 1024
 };
 
-#define CMM_SPLIT_PAGE_COUNT 1
-
 /*
  * This function checks if passed @name come to correct server (local MDT). If
  * not - return -ERESTART and let client know that dir was split and client
@@ -59,7 +57,7 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
         struct cml_object *clo = md2cml_obj(mp);
         int rc;
         ENTRY;
-        
+
         /* not split yet */
         if (clo->clo_split == CMM_SPLIT_NONE ||
             clo->clo_split == CMM_SPLIT_DENIED)
@@ -79,7 +77,7 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
                         clo->clo_split = CMM_SPLIT_NONE;
                 RETURN(0);
         }
-        
+
         LASSERT(ma->ma_lmv_size > 0);
         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
         if (ma->ma_lmv == NULL)
@@ -96,13 +94,13 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
         if (ma->ma_lmv->mea_count != 0) {
                 int idx;
 
-                /* 
+                /*
                  * Get stripe by name to check the name belongs to master dir,
                  * otherwise return the -ERESTART
                  */
                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
-                
-                /* 
+
+                /*
                  * Check if name came to correct MDT server. We suppose that if
                  * client does not know about split, it sends create operation
                  * to master MDT. And this is master job to say it that dir got
@@ -113,7 +111,7 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
                  */
                 if (idx != 0)
                         rc = -ERESTART;
-                
+
                 /* update split state to DONE if unknown */
                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
                         clo->clo_split = CMM_SPLIT_DONE;
@@ -139,7 +137,7 @@ int cmm_split_access(const struct lu_env *env, struct md_object *mo,
         ENTRY;
 
         memset(ma, 0, sizeof(*ma));
-        
+
         /*
          * Check only if we need protection from split.  If not - mdt handles
          * other cases.
@@ -161,7 +159,7 @@ int cmm_split_access(const struct lu_env *env, struct md_object *mo,
         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
                 RETURN(MDL_EX);
 
-        /* 
+        /*
          * Have no idea about lock mode, let it be what higher layer wants.
          */
         RETURN(MDL_MINMODE);
@@ -199,7 +197,7 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
          * for this get_attr.
          */
-        LASSERT(ma->ma_valid == 0); 
+        LASSERT(ma->ma_valid == 0);
         ma->ma_need = MA_INODE | MA_LMV;
         rc = mo_attr_get(env, mo, ma);
         if (rc)
@@ -217,7 +215,7 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
                 *split = clo->clo_split = CMM_SPLIT_NONE;
                 RETURN(0);
         }
-        
+
         *split = clo->clo_split = CMM_SPLIT_NEEDED;
         RETURN(0);
 }
@@ -249,9 +247,9 @@ static inline void cmm_object_put(const struct lu_env *env,
  * Allocate new on passed @mc for slave object which is going to create there
  * soon.
  */
-static int cmm_split_fid_alloc(const struct lu_env *env, 
+static int cmm_split_fid_alloc(const struct lu_env *env,
                                struct cmm_device *cmm,
-                               struct mdc_device *mc, 
+                               struct mdc_device *mc,
                                struct lu_fid *fid)
 {
         int rc;
@@ -260,7 +258,7 @@ static int cmm_split_fid_alloc(const struct lu_env *env,
         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
 
         down(&mc->mc_fid_sem);
-        
+
         /* Alloc new fid on @mc. */
         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
         if (rc > 0) {
@@ -271,20 +269,20 @@ static int cmm_split_fid_alloc(const struct lu_env *env,
                         CERROR("Can't create fld entry, rc %d\n", rc);
         }
         up(&mc->mc_fid_sem);
-        
+
         RETURN(rc);
 }
 
 /* Allocate new slave object on passed @mc */
-static int cmm_split_slave_create(const struct lu_env *env, 
+static int cmm_split_slave_create(const struct lu_env *env,
                                   struct cmm_device *cmm,
                                   struct mdc_device *mc,
-                                  struct lu_fid *fid, 
+                                  struct lu_fid *fid,
                                   struct md_attr *ma,
                                   struct lmv_stripe_md *lmv,
                                   int lmv_size)
 {
-        struct md_create_spec *spec;
+        struct md_create_spec *spec = &cmm_env_info(env)->cmi_spec;
         struct cmm_object *obj;
         int rc;
         ENTRY;
@@ -302,18 +300,13 @@ static int cmm_split_slave_create(const struct lu_env *env,
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
-        OBD_ALLOC_PTR(spec);
-        if (spec == NULL)
-                RETURN(-ENOMEM);
-
+        memset(spec, 0, sizeof *spec);
         spec->u.sp_ea.fid = fid;
         spec->u.sp_ea.eadata = lmv;
         spec->u.sp_ea.eadatalen = lmv_size;
         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
                               spec, ma);
-        OBD_FREE_PTR(spec);
-
         cmm_object_put(env, obj);
         RETURN(rc);
 }
@@ -328,7 +321,7 @@ static int cmm_split_slaves_create(const struct lu_env *env,
 {
         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
-        struct lmv_stripe_md *slave_lmv = NULL;
+        struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
         struct mdc_device    *mc, *tmp;
         struct lmv_stripe_md *lmv;
         int i = 1, rc = 0;
@@ -340,16 +333,13 @@ static int cmm_split_slaves_create(const struct lu_env *env,
         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
 
-        /* 
+        /*
          * Store master FID to local node idx number. Local node is always
          * master and its stripe number if 0.
          */
         lmv->mea_ids[0] = *lf;
 
-        OBD_ALLOC_PTR(slave_lmv);
-        if (slave_lmv == NULL)
-                RETURN(-ENOMEM);
-
+        memset(slave_lmv, 0, sizeof *slave_lmv);
         slave_lmv->mea_master = cmm->cmm_local_num;
         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         slave_lmv->mea_count = 0;
@@ -365,7 +355,6 @@ static int cmm_split_slaves_create(const struct lu_env *env,
         ma->ma_valid |= MA_LMV;
         EXIT;
 cleanup:
-        OBD_FREE_PTR(slave_lmv);
         return rc;
 }
 
@@ -390,12 +379,12 @@ static void cmm_split_dump_entry(const struct lu_env *env,
         obj = cmm_object_find(env, cmm, ef);
         if (IS_ERR(obj)) {
                 CERROR("Error while find object by "DFID
-                       ", rc %d\n", ef, (int)PTR_ERR(obj));
+                       ", rc %d\n", PFID(ef), (int)PTR_ERR(obj));
                 return;
         }
 
         local = (lu_object_exists(&obj->cmo_obj.mo_lu) > 0);
-        
+
         printk("%4.4x "DFID" %*.*s/%8.8x [%4.4x] (%s)\n",
                le16_to_cpu(ent->lde_reclen), PFID(ef),
                le16_to_cpu(ent->lde_namelen),
@@ -403,7 +392,7 @@ static void cmm_split_dump_entry(const struct lu_env *env,
                ent->lde_name, le32_to_cpu(ent->lde_hash),
                le16_to_cpu(ent->lde_namelen),
                (local ? "lc" : "cr"));
-        
+
         cmm_object_put(env, obj);
 }
 
@@ -416,12 +405,12 @@ static void cmm_split_dump_page(const struct lu_env *env,
 
         if (le16_to_cpu(dp->ldp_flags) & LDF_EMPTY)
                 return;
-        
+
         printk("Dump: page: [%8.8x-%8.8x]/[%8.8x], flags: "
                "%4.4x\n", le32_to_cpu(dp->ldp_hash_start),
                le32_to_cpu(dp->ldp_hash_end), hash_end,
                le16_to_cpu(dp->ldp_flags));
-        
+
         for (ent = lu_dirent_start(dp);
              ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end;
              ent = lu_dirent_next(ent)) {
@@ -454,7 +443,7 @@ static int cmm_split_remove_entry(const struct lu_env *env,
         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
         else
-                /* 
+                /*
                  * XXX: These days only cross-ref dirs are possible, so for the
                  * sake of simplicity, in split, we suppose that all cross-ref
                  * names pint to directory and do not do additional getattr to
@@ -556,7 +545,7 @@ static int cmm_split_send_page(const struct lu_env *env,
 }
 
 /* Read one page of entries from local MDT. */
-static int cmm_split_read_page(const struct lu_env *env, 
+static int cmm_split_read_page(const struct lu_env *env,
                                struct md_object *mo,
                                struct lu_rdpg *rdpg)
 {
@@ -568,14 +557,14 @@ static int cmm_split_read_page(const struct lu_env *env,
         RETURN(rc);
 }
 
-/* 
+/*
  * This function performs migration of all pages with entries which fit into one
  * stripe and one hash segment.
  */
 
 static int cmm_split_process_stripe(const struct lu_env *env,
                                     struct md_object *mo,
-                                    struct lu_rdpg *rdpg, 
+                                    struct lu_rdpg *rdpg,
                                     struct lu_fid *lf,
                                     __u32 end)
 {
@@ -627,21 +616,15 @@ static int cmm_split_process_dir(const struct lu_env *env,
                                  struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lu_rdpg *rdpg = NULL;
+        struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
         __u32 hash_segement;
         int rc = 0, i;
         ENTRY;
 
-        OBD_ALLOC_PTR(rdpg);
-        if (!rdpg)
-                RETURN(-ENOMEM);
-
+        memset(rdpg, 0, sizeof *rdpg);
         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
-
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
-        if (rdpg->rp_pages == NULL)
-                GOTO(free_rdpg, rc = -ENOMEM);
+        rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
 
         for (i = 0; i < rdpg->rp_npages; i++) {
                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
@@ -668,7 +651,7 @@ static int cmm_split_process_dir(const struct lu_env *env,
                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
                 if (rc) {
                         CERROR("Error (rc = %d) while splitting for %d: fid="
-                               DFID", %08x:%08x\n", rc, i, PFID(lf), 
+                               DFID", %08x:%08x\n", rc, i, PFID(lf),
                                rdpg->rp_hash, hash_end);
                         GOTO(cleanup, rc);
                 }
@@ -678,13 +661,6 @@ cleanup:
         for (i = 0; i < rdpg->rp_npages; i++)
                 if (rdpg->rp_pages[i] != NULL)
                         __free_pages(rdpg->rp_pages[i], 0);
-        if (rdpg->rp_pages)
-                OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
-                         sizeof rdpg->rp_pages[0]);
-free_rdpg:
-        if (rdpg)
-                OBD_FREE_PTR(rdpg);
-
         return rc;
 }
 
@@ -714,7 +690,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
         }
 
         la_size = ma->ma_attr.la_size;
-        
+
         /* Split should be done now, let's do it. */
         CWARN("Dir "DFID" is going to split (dir size: "LPU64")\n",
               PFID(lu_object_fid(&mo->mo_lu)), la_size);
@@ -768,7 +744,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
          */
         CWARN("Dir "DFID" has been split (dir size: "LPU64")\n",
               PFID(lu_object_fid(&mo->mo_lu)), la_size);
-        
+
         rc = -ERESTART;
         EXIT;
 cleanup: