Whamcloud - gitweb
not send LOV EA under replay, we can't know about they size at this
[fs/lustre-release.git] / lustre / mds / mds_lov.c
index b623979..0b4120b 100644 (file)
@@ -159,6 +159,174 @@ void mds_lov_destroy_objids(struct obd_device *obd)
         EXIT;
 }
 
+/**
+ * currently exist two ways for know about ost count and max ost index.
+ * first - after ost is connected to mds and sync process finished
+ * second - get from lmm in recovery process, in case when mds not have configs,
+ * and ost isn't registered in mgs.
+ *
+ * \param mds pointer to mds structure
+ * \param index maxium ost index
+ *
+ * \retval -ENOMEM is not hame memory for new page
+ * \retval 0 is update passed
+ */
+static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
+{
+        __u32 page = index / OBJID_PER_PAGE();
+        __u32 off = index % OBJID_PER_PAGE();
+        obd_id *data =  mds->mds_lov_page_array[page];
+
+        if (data == NULL) {
+                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
+                if (data == NULL)
+                        RETURN(-ENOMEM);
+
+                mds->mds_lov_page_array[page] = data;
+        }
+
+        if (index > mds->mds_lov_objid_max_index) {
+                mds->mds_lov_objid_lastpage = page;
+                mds->mds_lov_objid_lastidx = off;
+                mds->mds_lov_objid_max_index = index;
+        }
+
+        /* workaround - New target not in objids file; increase mdsize */
+        /* ld_tgt_count is used as the max index everywhere, despite its name. */
+        if (data[off] == 0) {
+                __u32 stripes;
+
+                data[off] = 1;
+                mds->mds_lov_objid_count++;
+                stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
+                                mds->mds_lov_objid_count);
+
+                mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
+                mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+
+                CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
+                       " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
+                       mds->mds_max_cookiesize);
+        }
+
+        EXIT;
+        return 0;
+}
+
+int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+{
+        struct lov_ost_data_v1 *data;
+        __u32 count;
+        int rc = 0;
+        __u32 j;
+
+        /* if we create file without objects - lmm is NULL */
+        if (lmm == NULL)
+                return 0;
+
+        switch (le32_to_cpu(lmm->lmm_magic)) {
+                case LOV_MAGIC_V1:
+                        count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
+                        data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
+                        break;
+                case LOV_MAGIC_V3:
+                        count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
+                        data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
+                        break;
+                default:
+                        CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
+                        RETURN(-EINVAL);
+        }
+
+
+        mutex_down(&obd->obd_dev_sem);
+        for (j = 0; j < count; j++) {
+                __u32 i = le32_to_cpu(data[j].l_ost_idx);
+                if (mds_lov_update_max_ost(&obd->u.mds, i)) {
+                        rc = -ENOMEM;
+                        break;
+                }
+        }
+        mutex_up(&obd->obd_dev_sem);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(mds_lov_prepare_objids);
+
+void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int j;
+        struct lov_ost_data_v1 *obj;
+        int count;
+        ENTRY;
+
+        /* if we create file without objects - lmm is NULL */
+        if (lmm == NULL)
+                return;
+
+        switch (le32_to_cpu(lmm->lmm_magic)) {
+                case LOV_MAGIC_V1:
+                        count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
+                        obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
+                        break;
+                case LOV_MAGIC_V3:
+                        count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
+                        obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
+                        break;
+                default:
+                        CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
+                        return;
+        }
+
+        for (j = 0; j < count; j++) {
+                __u32 i = le32_to_cpu(obj[j].l_ost_idx);
+                obd_id id = le64_to_cpu(obj[j].l_object_id);
+                __u32 page = i / OBJID_PER_PAGE();
+                __u32 idx = i % OBJID_PER_PAGE();
+                obd_id *data;
+
+                data = mds->mds_lov_page_array[page];
+
+                CDEBUG(D_INODE,"update last object for ost %u"
+                       " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
+                if (id > data[idx]) {
+                        data[idx] = id;
+                        cfs_bitmap_set(mds->mds_lov_page_dirty, page);
+                }
+        }
+        EXIT;
+        return;
+}
+EXPORT_SYMBOL(mds_lov_update_objids);
+
+
+static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
+                                    __u32 count)
+{
+        __u32 i;
+        __u32 stripes;
+
+        for(i = 0; i < count; i++) {
+                if (data[i] == 0)
+                        continue;
+
+                mds->mds_lov_objid_count++;
+        }
+
+        stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
+                         mds->mds_lov_objid_count);
+
+        mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
+        mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+
+        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
+               "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
+
+        EXIT;
+        return 0;
+}
+
 static int mds_lov_read_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -178,34 +346,33 @@ static int mds_lov_read_objids(struct obd_device *obd)
         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
 
         for (i = 0; i < page; i++) {
-                obd_id *data =  mds->mds_lov_page_array[i];
                 loff_t off_old = off;
 
-                LASSERT(data == NULL);
-                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
-                if (data == NULL)
+                LASSERT(mds->mds_lov_page_array[i] == NULL);
+                OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
+                if (mds->mds_lov_page_array[i] == NULL)
                         GOTO(out, rc = -ENOMEM);
 
-                mds->mds_lov_page_array[i] = data;
-
-                rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
+                rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
                 if (rc < 0) {
                         CERROR("Error reading objids %d\n", rc);
                         GOTO(out, rc);
                 }
-                if (off == off_old)
-                        break; // eof
 
                 count += (off - off_old)/sizeof(obd_id);
+                if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
+                         CERROR("Can't update mds data\n");
+                         GOTO(out, rc = -EIO);
+                 }
+
+                if (off == off_old)
+                        break; // eof
         }
-        mds->mds_lov_objid_count = count;
-       if (count) {
-                count --;
-                mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
-                mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
-       }
-        CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
+        mds->mds_lov_objid_lastpage = i;
+        mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
+
+        CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
 out:
         mds_lov_dump_objids("read",obd);
@@ -249,7 +416,7 @@ int mds_lov_write_objids(struct obd_device *obd)
 EXPORT_SYMBOL(mds_lov_write_objids);
 
 static int mds_lov_get_objid(struct obd_device * obd,
-                             __u32 idx)
+                             obd_id idx)
 {
         struct mds_obd *mds = &obd->u.mds;
         unsigned int page;
@@ -261,14 +428,6 @@ static int mds_lov_get_objid(struct obd_device * obd,
         page = idx / OBJID_PER_PAGE();
         off = idx % OBJID_PER_PAGE();
         data = mds->mds_lov_page_array[page];
-        if (data == NULL) {
-                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
-                if (data == NULL)
-                        GOTO(out, rc = -ENOMEM);
-
-                mds->mds_lov_page_array[page] = data;
-        }
-
         if (data[off] == 0) {
                 /* We never read this lastid; ask the osc */
                 struct obd_id_info lastid;
@@ -281,11 +440,6 @@ static int mds_lov_get_objid(struct obd_device * obd,
                 if (rc)
                         GOTO(out, rc);
 
-                if (idx > mds->mds_lov_objid_count) {
-                        mds->mds_lov_objid_count = idx;
-                        mds->mds_lov_objid_lastpage = page;
-                        mds->mds_lov_objid_lastidx = off;
-                }
                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
         }
 out:
@@ -326,9 +480,6 @@ static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
 
         LASSERT(!obd->obd_recovering);
 
-        /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
-        LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
-
         info.idx = idx;
         info.data = id;
         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
@@ -346,7 +497,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lov_desc *ld;
-        __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
+        __u32 valsize = sizeof(mds->mds_lov_desc);
         int rc = 0;
         ENTRY;
 
@@ -365,14 +516,12 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
 
-        stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
-                               mds->mds_lov_desc.ld_tgt_count);
+        mutex_down(&obd->obd_dev_sem);
+        rc = mds_lov_update_max_ost(mds, idx);
+        mutex_up(&obd->obd_dev_sem);
+        if (rc != 0)
+                GOTO(out, rc );
 
-        mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
-        mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
-        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
-               "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
-               stripes);
 
         /* If we added a target we have to reconnect the llogs */
         /* We only _need_ to do this at first add (idx), or the first time
@@ -384,7 +533,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         /*XXX this notifies the MDD until lov handling use old mds code */
         if (obd->obd_upcall.onu_owner) {
                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
-                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
                                                  obd->obd_upcall.onu_owner);
         }
 out:
@@ -406,8 +555,6 @@ static int mds_lov_update_mds(struct obd_device *obd,
         ENTRY;
 
         /* Don't let anyone else mess with mds_lov_objids now */
-        mutex_down(&obd->obd_dev_sem);
-
         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
         if (rc)
                 GOTO(out, rc);
@@ -446,7 +593,6 @@ static int mds_lov_update_mds(struct obd_device *obd,
                         data[off], idx, rc);
         }
 out:
-        mutex_up(&obd->obd_dev_sem);
         RETURN(rc);
 }
 
@@ -472,6 +618,21 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 RETURN(-ENOTCONN);
         }
 
+        mutex_down(&obd->obd_dev_sem);
+        rc = mds_lov_read_objids(obd);
+        mutex_up(&obd->obd_dev_sem);
+        if (rc) {
+                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
+                GOTO(err_exit, rc);
+        }
+
+        rc = obd_register_observer(mds->mds_osc_obd, obd);
+        if (rc) {
+                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
+                       lov_name, rc);
+                GOTO(err_exit, rc);
+        }
+
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
                 RETURN(-ENOMEM);
@@ -494,24 +655,6 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         }
         mds->mds_osc_exp = class_conn2export(&conn);
 
-        rc = obd_register_observer(mds->mds_osc_obd, obd);
-        if (rc) {
-                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
-                       lov_name, rc);
-                GOTO(err_discon, rc);
-        }
-
-        /* Deny new client connections until we are sure we have some OSTs */
-        obd->obd_no_conn = 1;
-
-        mutex_down(&obd->obd_dev_sem);
-        rc = mds_lov_read_objids(obd);
-        if (rc) {
-                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
-                GOTO(err_reg, rc);
-        }
-        mutex_up(&obd->obd_dev_sem);
-
         /* I want to see a callback happen when the OBD moves to a
          * "For General Use" state, and that's when we'll call
          * set_nextid().  The class driver can help us here, because
@@ -523,11 +666,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          */
         RETURN(rc);
 
-err_reg:
-        mutex_up(&obd->obd_dev_sem);
-        obd_register_observer(mds->mds_osc_obd, NULL);
-err_discon:
-        obd_disconnect(mds->mds_osc_exp);
+err_exit:
         mds->mds_osc_exp = NULL;
         mds->mds_osc_obd = ERR_PTR(rc);
         RETURN(rc);
@@ -554,18 +693,6 @@ int mds_lov_disconnect(struct obd_device *obd)
         RETURN(rc);
 }
 
-/* Collect the preconditions we need to allow client connects */
-static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
-{
-        if (flag & CONFIG_LOG)
-                obd->u.mds.mds_fl_cfglog = 1;
-        if (flag & CONFIG_SYNC)
-                obd->u.mds.mds_fl_synced = 1;
-        if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
-                /* Open for clients */
-                obd->obd_no_conn = 0;
-}
-
 struct mds_lov_sync_info {
         struct obd_device *mlsi_obd;     /* the lov device to sync */
         struct obd_device *mlsi_watched; /* target osc */
@@ -673,7 +800,7 @@ static int __mds_lov_synchronize(void *data)
                  * in mdd is removed, This hack should be removed.
                  */
                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
-                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
                                                  obd->obd_upcall.onu_owner);
         }
         EXIT;
@@ -720,6 +847,7 @@ int mds_lov_start_synchronize(struct obd_device *obd,
         if (mlsi == NULL)
                 RETURN(-ENOMEM);
 
+        LASSERT(data);
         mlsi->mlsi_obd = obd;
         mlsi->mlsi_watched = watched;
         mlsi->mlsi_index = *(__u32 *)data;
@@ -761,19 +889,22 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
         int rc = 0;
         ENTRY;
 
+        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
+
         switch (ev) {
         /* We only handle these: */
         case OBD_NOTIFY_ACTIVE:
+                /* lov want one or more _active_ targets for work */
+                /* activate event should be pass lov idx as argument */
         case OBD_NOTIFY_SYNC:
         case OBD_NOTIFY_SYNC_NONBLOCK:
+                /* sync event should be pass lov idx as argument */
                 break;
         case OBD_NOTIFY_CONFIG:
-                mds_allow_cli(obd, (unsigned long)data);
         default:
                 RETURN(0);
         }
 
-        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
                 CERROR("unexpected notification of %s %s!\n",
                        watched->obd_type->typ_name, watched->obd_name);
@@ -787,12 +918,8 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 /* We still have to fix the lov descriptor for ost's added
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
-                mutex_down(&obd->obd_dev_sem);
                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
                                         &watched->u.cli.cl_target_uuid);
-                mutex_up(&obd->obd_dev_sem);
-                if (rc == 0)
-                        mds_allow_cli(obd, CONFIG_SYNC);
                 RETURN(rc);
         }