Whamcloud - gitweb
Alex's fix for cross-MDS mkdir recovery (b=3869)
[fs/lustre-release.git] / lustre / mds / mds_lov.c
index 81b22b3..b61caf0 100644 (file)
@@ -61,11 +61,11 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
         int i;
         ENTRY;
 
-        lock_kernel();
+        spin_lock(&mds->mds_lov_lock);
         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
                 if (ids[i] > (mds->mds_lov_objids)[i])
                         (mds->mds_lov_objids)[i] = ids[i];
-        unlock_kernel();
+        spin_unlock(&mds->mds_lov_lock);
         EXIT;
 }
 
@@ -176,6 +176,7 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
 {
         struct mds_obd *mds = &obd->u.mds;
         int valsize = sizeof(mds->mds_lov_desc), rc, i;
+        int old_count = mds->mds_lov_desc.ld_tgt_count;
         ENTRY;
 
         rc = obd_get_info(lov, strlen("lovdesc") + 1, "lovdesc", &valsize,
@@ -183,11 +184,32 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
         if (rc)
                 RETURN(rc);
 
+        /* The size of the LOV target table may have increased. */
+        if (old_count >= mds->mds_lov_desc.ld_tgt_count) {
+                obd_id *ids;
+                int     size;
+
+                size = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
+                OBD_ALLOC(ids, size);
+                if (ids == NULL)
+                        RETURN(-ENOMEM);
+
+                memset(ids, 0, size);
+
+                if (mds->mds_lov_objids != NULL) {
+                        int oldsize = old_count * sizeof(*ids);
+
+                        memcpy(ids, mds->mds_lov_objids, oldsize);
+                        OBD_FREE(mds->mds_lov_objids, oldsize);
+                }
+                mds->mds_lov_objids = ids;
+        }
+
         i = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
         if (i > mds->mds_max_mdsize)
                 mds->mds_max_mdsize = i;
         mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count *
-                sizeof(struct llog_cookie);
+                                  sizeof(struct llog_cookie);
         mds->mds_has_lov_desc = 1;
         RETURN(0);
 }
@@ -207,6 +229,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (mds->mds_osc_obd)
                 RETURN(0);
 
+        spin_lock_init(&mds->mds_lov_lock);
         mds->mds_osc_obd = class_name2obd(lov_name);
         if (!mds->mds_osc_obd) {
                 CERROR("MDS cannot locate LOV %s\n",
@@ -218,7 +241,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         CDEBUG(D_HA, "obd: %s osc: %s lov_name: %s\n",
                obd->obd_name, mds->mds_osc_obd->obd_name, lov_name);
 
-        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid);
+        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid,
+                         mds->mds_num + FILTER_GROUP_FIRST_MDS);
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n",
                        lov_name, rc);
@@ -522,8 +546,9 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 }
 
 struct mds_lov_sync_info {
-        struct obd_device *mlsi_obd; /* the lov device to sync */
-        struct obd_uuid   *mlsi_uuid;  /* target to sync */
+        struct obd_device *mlsi_obd;      /* the mds to sync */
+        struct obd_device *mlsi_watched;  /* new lov target */
+        int                mlsi_index;    /* index into mds_lov_objids */ 
 };
 
 int mds_lov_synchronize(void *data)
@@ -531,44 +556,76 @@ int mds_lov_synchronize(void *data)
         struct mds_lov_sync_info *mlsi = data;
         struct llog_ctxt *ctxt;
         struct obd_device *obd;
+        struct obd_device *watched;
+        struct mds_obd *mds;
         struct obd_uuid *uuid;
+        obd_id vals[2];
         unsigned long flags;
+        __u32  vallen;
+        __u32  group;
+        int old_count;
+        int count;
+        int index;
         int rc;
-        int valsize;
-        __u32 group;
+        char name[32] = "CATLIST";
 
         lock_kernel();
+
         ptlrpc_daemonize();
-        snprintf (current->comm, sizeof (current->comm), "%s", "mds_lov_sync");
 
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
 
+        unlock_kernel();
+
         obd = mlsi->mlsi_obd;
-        uuid = mlsi->mlsi_uuid;
+        watched = mlsi->mlsi_watched;
+        index = mlsi->mlsi_index;
+
+        LASSERT(obd != NULL);
+        LASSERT(watched != NULL);
 
         OBD_FREE(mlsi, sizeof(*mlsi));
 
+        mds = &obd->u.mds;
+        uuid = &watched->u.cli.cl_import->imp_target_uuid;
 
-        LASSERT(obd != NULL);
-        LASSERT(uuid != NULL);
+        group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+        rc = obd_set_info(watched->obd_self_export, strlen("mds_conn"),
+                          "mds_conn", sizeof(group), &group);
+        if (rc)
+                RETURN(rc);
 
-        group = FILTER_GROUP_FIRST_MDS + obd->u.mds.mds_num;
-        valsize = sizeof(group);
-        rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), 
-                          "mds_conn", valsize, &group);
-        if (rc != 0) {
-                CERROR("obd_set_info(mds_conn) failed %d\n", rc);
+        old_count = mds->mds_lov_desc.ld_tgt_count;
+
+        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+        if (rc)
                 RETURN(rc);
-        }
-        
+
+        count = mds->mds_lov_desc.ld_tgt_count;
+        LASSERT(count >= old_count);
+
+        vallen = sizeof(vals[1]);
+        rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
+                          "last_id", &vallen, &vals[1]);
+        if (rc)
+                RETURN(rc);
+
+        vals[0] = index;
+        rc = mds_lov_set_info(obd->obd_self_export, strlen("next_id"),
+                              "next_id", 2, vals);
+        if (rc)
+                RETURN(rc);
+
+        obd_llog_finish(obd, &obd->obd_llogs, old_count);
+        obd_llog_cat_initialize(obd, &obd->obd_llogs, count, name);
+
         ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
         LASSERT(ctxt != NULL);
 
-        rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
-                          NULL, NULL, uuid);
+        rc = llog_connect(ctxt, count, NULL, NULL, uuid);
         if (rc != 0) {
                 CERROR("%s: failed at llog_origin_connect: %d\n", 
                        obd->obd_name, rc);
@@ -577,6 +634,7 @@ int mds_lov_synchronize(void *data)
         
         CWARN("MDS %s: %s now active, resetting orphans\n",
               obd->obd_name, uuid->uuid);
+
         rc = mds_lov_clearorphans(&obd->u.mds, uuid);
         if (rc != 0) {
                 CERROR("%s: failed at mds_lov_clearorphans: %d\n", 
@@ -587,7 +645,8 @@ int mds_lov_synchronize(void *data)
         RETURN(0);
 }
 
-int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid)
+int mds_lov_start_synchronize(struct obd_device *obd,
+                              struct obd_device *watched, void *data)
 {
         struct mds_lov_sync_info *mlsi;
         int rc;
@@ -599,7 +658,8 @@ int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid)
                 RETURN(-ENOMEM);
 
         mlsi->mlsi_obd = obd;
-        mlsi->mlsi_uuid = uuid;
+        mlsi->mlsi_watched = watched;
+        mlsi->mlsi_index = (int)data;
 
         rc = kernel_thread(mds_lov_synchronize, mlsi, CLONE_VM | CLONE_FILES);
         if (rc < 0)
@@ -614,9 +674,9 @@ int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid)
         RETURN(rc);
 }
 
-int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
+int mds_notify(struct obd_device *obd, struct obd_device *watched,
+               int active, void *data)
 {
-        struct obd_uuid *uuid;
         int rc = 0;
         ENTRY;
 
@@ -629,18 +689,20 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
                 RETURN(-EINVAL);
         }
 
-        uuid = &watched->u.cli.cl_import->imp_target_uuid;
         if (obd->obd_recovering) {
+                struct obd_uuid *uuid;
+                uuid = &watched->u.cli.cl_import->imp_target_uuid;
+
                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
                       obd->obd_name, uuid->uuid);
         } else {
-                rc = mds_lov_start_synchronize(obd, uuid);
+                rc = mds_lov_start_synchronize(obd, watched, data);
         }
         RETURN(rc);
 }
 
-int mds_set_info(struct obd_export *exp, obd_count keylen,
-                 void *key, obd_count vallen, void *val)
+int mds_lov_set_info(struct obd_export *exp, obd_count keylen,
+                     void *key, obd_count vallen, void *val)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct mds_obd *mds = &obd->u.mds;
@@ -651,41 +713,19 @@ int mds_set_info(struct obd_export *exp, obd_count keylen,
 
         if (KEY_IS("next_id")) {
                 obd_id *id = (obd_id *)val;
-                int idx, rc;
+                int idx;
 
                 /* XXX - this really should be vallen != (2 * sizeof(*id)) *
                  * Just following the precedent set by lov_set_info.       */
                 if (vallen != 2)
                         RETURN(-EINVAL);
 
-                if ((idx = *id) != *id)
+                idx = *id;
+                if ((idx != *id) || (idx >= mds->mds_lov_desc.ld_tgt_count))
                         RETURN(-EINVAL);
 
                 CDEBUG(D_CONFIG, "idx: %d id: %llu\n", idx, *(id + 1));
 
-                /* The size of the LOV target table may have increased. */
-                if (idx >= mds->mds_lov_desc.ld_tgt_count) {
-                        obd_id *ids;
-                        int oldsize, size;
-
-                        oldsize = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
-                        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
-                        if (rc)
-                                RETURN(rc);
-                        if (idx >= mds->mds_lov_desc.ld_tgt_count)
-                                RETURN(-EINVAL);
-                        size = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
-                        OBD_ALLOC(ids, size);
-                        if (ids == NULL)
-                                RETURN(-ENOMEM);
-                        memset(ids, 0, size);
-                        if (mds->mds_lov_objids != NULL) {
-                                memcpy(ids, mds->mds_lov_objids, oldsize);
-                                OBD_FREE(mds->mds_lov_objids, oldsize);
-                        }
-                        mds->mds_lov_objids = ids;
-                }
-
                 mds->mds_lov_objids[idx] = *++id;
                 CDEBUG(D_CONFIG, "objid: %d: %lld\n", idx, *id);
                 /* XXX - should we be writing this out here ? */
@@ -732,7 +772,6 @@ int mds_lov_update_config(struct obd_device *obd, int clean)
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc == 0) {
                 mds->mds_config_version = version;
-                rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
         }
         CWARN("Finished applying configuration log %s: %d\n", name, rc);
 
@@ -837,9 +876,11 @@ int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
         oa->o_generation = inode->i_generation;
         oa->o_uid = 0; /* must have 0 uid / gid on OST */
         oa->o_gid = 0;
+        oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
 
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE |
-                      OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+                      OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
+                      OBD_MD_FLGROUP;
         valid = OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME |
                 OBD_MD_FLCTIME;
         obdo_from_inode(oa, inode, valid);