Whamcloud - gitweb
b=4030
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 78e0eb1..a50516f 100644 (file)
 #include <linux/lustre_smfs.h>
 #include "filter_internal.h"
 
+/* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
+
 static struct lvfs_callback_ops filter_lvfs_ops;
 
 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *ea, struct obd_trans_info *);
+static struct obd_llogs *filter_grab_llog_for_group(struct obd_device *,
+                                                    int, struct obd_export *);
 
 static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
@@ -574,6 +579,50 @@ static int filter_cleanup_groups(struct obd_device *obd)
         RETURN(0);
 }
 
+static int filter_update_last_group(struct obd_device *obd, int group)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct file *filp = NULL;
+        int last_group = 0, rc;
+        loff_t off = 0;
+        ENTRY;
+
+        if (group <= filter->fo_committed_group)
+                RETURN(0);
+
+        filp = filp_open("LAST_GROUP", O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                rc = PTR_ERR(filp);
+                filp = NULL;
+                CERROR("cannot open LAST_GROUP: rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
+                GOTO(cleanup, rc);
+        }
+        LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+        CDEBUG(D_INODE, "%s: previous %d, new %d\n",
+               obd->obd_name, last_group, group);
+
+        off = 0;
+        last_group = group;
+        /* must be sync: bXXXX */
+        rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1);
+        if (rc) {
+                CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        filter->fo_committed_group = group;
+cleanup:
+        if (filp)
+                filp_close(filp, 0);
+        RETURN(rc);
+}
+
 static int filter_read_group_internal(struct obd_device *obd, int group,
                                       int create)
 {
@@ -693,6 +742,8 @@ static int filter_read_group_internal(struct obd_device *obd, int group,
                 OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
         }
 
+        filter_update_last_group(obd, group);
+        
         if (filp->f_dentry->d_inode->i_size == 0) {
                 filter->fo_last_objids[group] = FILTER_INIT_OBJID;
                 RETURN(0);
@@ -754,6 +805,9 @@ static int filter_prep_groups(struct obd_device *obd)
         struct filter_obd *filter = &obd->u.filter;
         struct dentry *dentry, *O_dentry;
         int rc = 0, cleanup_phase = 0;
+        struct file *filp = NULL;
+        int last_group;
+        loff_t off = 0;
         ENTRY;
 
         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
@@ -816,16 +870,40 @@ static int filter_prep_groups(struct obd_device *obd)
 
         cleanup_phase = 2; /* groups */
 
-        /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
-        rc = filter_read_groups(obd, FILTER_MIN_GROUPS, 1);
-        if (rc)
+        /* we have to initialize all groups before first connections from
+         * clients because they may send create/destroy for any group -bzzz */
+        filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp));
+                GOTO(cleanup, rc = PTR_ERR(filp));
+        }
+        cleanup_phase = 3; /* filp */
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
                 GOTO(cleanup, rc);
+        }
+        if (off == 0) {
+                last_group = FILTER_MIN_GROUPS;
+        } else {
+                LASSERT(last_group >= FILTER_MIN_GROUPS);
+        }
 
+        CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
+              FILTER_MIN_GROUPS, last_group);
+        filter->fo_committed_group = last_group;
+        rc = filter_read_groups(obd, last_group, 1);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        filp_close(filp, 0);
         RETURN(0);
 
  cleanup:
         switch (cleanup_phase) {
+        case 3:
+                filp_close(filp, 0);
         case 2:
                 filter_cleanup_groups(obd);
         case 1:
@@ -1381,6 +1459,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
                 GOTO(err_mntput, rc);
 
         sema_init(&filter->fo_init_lock, 1);
+        filter->fo_committed_group = 0;
         rc = filter_prep(obd);
         if (rc)
                 GOTO(err_mntput, rc);
@@ -1523,6 +1602,37 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         RETURN(0);
 }
 
+static int filter_connect_post(struct obd_export *exp)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_export_data *fed;
+        char str[PTL_NALFMT_SIZE];
+        struct obd_llogs *llog;
+        struct llog_ctxt *ctxt;
+        int rc;
+        ENTRY;
+
+        fed = &exp->exp_filter_data;
+        if (fed->fed_group < FILTER_MIN_GROUPS)
+                RETURN(0);
+
+        /* initialize llogs for connections from MDS */
+        llog = filter_grab_llog_for_group(obd, fed->fed_group, exp);
+        LASSERT(llog != NULL);
+
+        ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
+        LASSERT(ctxt != NULL);
+
+        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+                        exp->exp_connection->c_peer.peer_nid, str);
+        CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
+               obd->obd_name, exp->exp_connection->c_peer.peer_nid,
+               str, fed->fed_group);
+
+        RETURN(rc);
+}
+
 /* nearly identical to mds_connect */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
                           struct obd_uuid *cluuid, unsigned long connect_flags)
@@ -1531,7 +1641,8 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct filter_export_data *fed;
         struct filter_client_data *fcd = NULL;
         struct filter_obd *filter = &obd->u.filter;
-        int rc;
+        struct lvfs_run_ctxt saved;
+        int rc, group;
         ENTRY;
 
         if (conn == NULL || obd == NULL || cluuid == NULL)
@@ -1547,19 +1658,54 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         spin_lock_init(&fed->fed_lock);
 
-        if (!obd->obd_replayable)
-                GOTO(cleanup, rc = 0);
+        if (obd->obd_replayable) {
+                OBD_ALLOC(fcd, sizeof(*fcd));
+                if (!fcd) {
+                        CERROR("filter: out of memory for client data\n");
+                        GOTO(cleanup, rc = -ENOMEM);
+                }
 
-        OBD_ALLOC(fcd, sizeof(*fcd));
-        if (!fcd) {
-                CERROR("filter: out of memory for client data\n");
-                GOTO(cleanup, rc = -ENOMEM);
+                memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+                fed->fed_fcd = fcd;
+
+                rc = filter_client_add(obd, filter, fed, -1);
+                if (rc)
+                        GOTO(cleanup, rc);
         }
 
-        memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
-        fed->fed_fcd = fcd;
+        if (connect_flags == 0)
+                GOTO(cleanup, rc);
+
+        /* connection from MDS */
+        group = connect_flags;
+        CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
+              obd->obd_name, exp->exp_handle.h_cookie, group);
+        
+        if (fed->fed_group != 0 && fed->fed_group != group) {
+                char str[PTL_NALFMT_SIZE];
+                portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
+                                exp->exp_connection->c_peer.peer_nid, str);
+                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+                       "earlier; now it's trying to use group %d!  This could "
+                       "be a bug in the MDS.  Tell CFS.\n",
+                       exp->exp_connection->c_peer.peer_nid, str,
+                       fed->fed_group, group);
+                GOTO(cleanup, rc = -EPROTO);
+        }
+        fed->fed_group = group;
 
-        rc = filter_client_add(obd, filter, fed, -1);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        rc = filter_read_groups(obd, group, 1);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc != 0) {
+                CERROR("can't read group %u\n", group);
+                GOTO(cleanup, rc);
+        }
+        rc = filter_group_set_fs_flags(obd, group);
+         if (rc != 0) {
+                CERROR("can't set kml flags %u\n", group);
+                GOTO(cleanup, rc);
+        }
 
 cleanup:
         if (rc) {
@@ -1723,13 +1869,13 @@ static int filter_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
-static void filter_sync_llogs(struct obd_export *dexp)
+static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
         struct filter_group_llog *fglog, *nlog;
-        struct obd_device *obd = dexp->exp_obd;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
+        ENTRY;
 
         filter = &obd->u.filter;
 
@@ -1759,11 +1905,15 @@ static void filter_sync_llogs(struct obd_export *dexp)
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog) {
-                        worked = fglog->group;
+                if (fglog == NULL)
+                        break;
+
+                worked = fglog->group;
+                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
                         ctxt = llog_get_context(fglog->llogs,
-                                                LLOG_UNLINK_REPL_CTXT);
-                        llog_sync(ctxt, dexp);
+                                        LLOG_UNLINK_REPL_CTXT);
+                        LASSERT(ctxt != NULL);
+                        llog_sync(ctxt, fglog->exp);
                 }
         } while (fglog != NULL);
 }
@@ -1795,7 +1945,7 @@ static int filter_disconnect(struct obd_export *exp, int flags)
         fsfilt_sync(obd, obd->u.filter.fo_sb);
 
         /* flush any remaining cancel messages out to the target */
-        filter_sync_llogs(exp);
+        filter_sync_llogs(obd, exp);
 
         class_export_put(exp);
         RETURN(rc);
@@ -2465,7 +2615,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -2475,8 +2624,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(obd, filter->fo_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(obd, NULL);
                 RETURN(rc);
         }
 
@@ -2595,11 +2743,13 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
         RETURN(-EINVAL);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+                                             struct obd_export *export)
 {
         struct filter_group_llog *fglog, *nlog;
         char name[32] = "CATLIST";
         struct filter_obd *filter;
+        struct llog_ctxt *ctxt;
         struct list_head *cur;
         int rc;
 
@@ -2609,12 +2759,18 @@ struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
         list_for_each(cur, &filter->fo_llog_list) {
                 fglog = list_entry(cur, struct filter_group_llog, list);
                 if (fglog->group == group) {
+                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
+                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
+                                      obd->obd_name, group, fglog->exp, export);
                         spin_unlock(&filter->fo_llog_list_lock);
-                        RETURN(fglog->llogs);
+                        goto init;
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
+        if (export == NULL)
+                RETURN(NULL);
+
         OBD_ALLOC(fglog, sizeof(*fglog));
         if (fglog == NULL)
                 RETURN(NULL);
@@ -2641,75 +2797,18 @@ struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
                 RETURN(NULL);
         }
 
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n", obd->obd_name,
-                fglog->llogs, group);
-
-        RETURN(fglog->llogs);
-}
-
-static int filter_set_info(struct obd_export *exp, __u32 keylen,
-                           void *key, __u32 vallen, void *val)
-{
-        struct lvfs_run_ctxt saved;
-        struct filter_export_data *fed = &exp->exp_filter_data;
-        struct obd_device *obd;
-        struct lustre_handle conn;
-        struct obd_llogs *llog;
-        struct llog_ctxt *ctxt;
-        __u32 group;
-        int rc = 0;
-        ENTRY;
+init:
+        if (export) {
+                fglog->exp = export;
+                ctxt = llog_get_context(fglog->llogs, LLOG_UNLINK_REPL_CTXT);
+                LASSERT(ctxt != NULL);
 
-        conn.cookie = exp->exp_handle.h_cookie;
-
-        obd = exp->exp_obd;
-        if (obd == NULL) {
-                CDEBUG(D_IOCTL, "invalid exp %p cookie "LPX64"\n",
-                       exp, conn.cookie);
-                RETURN(-EINVAL);
-        }
-
-        if (keylen < strlen("mds_conn") ||
-            memcmp(key, "mds_conn", keylen) != 0)
-                RETURN(-EINVAL);
-
-        group = *((__u32 *)val);
-        if (fed->fed_group != 0 && fed->fed_group != group) {
-                char str[PTL_NALFMT_SIZE];
-                portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
-                                exp->exp_connection->c_peer.peer_nid, str);
-                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
-                       "earlier; now it's trying to use group %d!  This could "
-                       "be a bug in the MDS.  Tell CFS.\n",
-                       exp->exp_connection->c_peer.peer_nid, str,
-                       fed->fed_group, group);
-                RETURN(-EPROTO);
+                llog_receptor_accept(ctxt, export->exp_imp_reverse);
         }
-        fed->fed_group = group;
-        CWARN("Received MDS connection ("LPX64"); group %d\n", conn.cookie,
-              group);
+        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
+               obd->obd_name, fglog->llogs, group);
 
-        LASSERT(rc == 0);
-
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = filter_read_groups(obd, group, 1);
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc != 0) {
-                CERROR("can't read group %u\n", group);
-                RETURN(rc);
-        }
-        rc = filter_group_set_fs_flags(obd, group);
-         if (rc != 0) {
-                CERROR("can't set kml flags %u\n", group);
-                RETURN(rc);
-        }
-        llog = filter_grab_llog_for_group(obd, group);
-        LASSERT(llog != NULL);
-
-        ctxt = llog_get_context(llog, LLOG_UNLINK_REPL_CTXT);
-        LASSERT(ctxt != NULL);
-        rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-        RETURN(rc);
+        RETURN(fglog->llogs);
 }
 
 int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
@@ -2824,9 +2923,10 @@ static int filter_llog_finish(struct obd_device *obd,
         RETURN(rc);
 }
 
-static int filter_llog_connect(struct obd_device *obd,
+static int filter_llog_connect(struct obd_export *exp,
                                struct llogd_conn_body *body) 
 {
+        struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
         struct obd_llogs *llog;
         int rc;
@@ -2836,7 +2936,7 @@ static int filter_llog_connect(struct obd_device *obd,
                (unsigned) body->lgdc_logid.lgl_ogr,
                (unsigned) body->lgdc_logid.lgl_oid,
                (unsigned) body->lgdc_logid.lgl_ogen);
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr);
+        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
         LASSERT(llog != NULL);
         ctxt = llog_get_context(llog, body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
@@ -2862,11 +2962,11 @@ static struct obd_ops filter_obd_ops = {
         .o_attach         = filter_attach,
         .o_detach         = filter_detach,
         .o_get_info       = filter_get_info,
-        .o_set_info       = filter_set_info,
         .o_setup          = filter_setup,
         .o_precleanup     = filter_precleanup,
         .o_cleanup        = filter_cleanup,
         .o_connect        = filter_connect,
+        .o_connect_post   = filter_connect_post,
         .o_disconnect     = filter_disconnect,
         .o_statfs         = filter_statfs,
         .o_getattr        = filter_getattr,
@@ -2893,11 +2993,11 @@ static struct obd_ops filter_sanobd_ops = {
         .o_attach         = filter_attach,
         .o_detach         = filter_detach,
         .o_get_info       = filter_get_info,
-        .o_set_info       = filter_set_info,
         .o_setup          = filter_san_setup,
         .o_precleanup     = filter_precleanup,
         .o_cleanup        = filter_cleanup,
         .o_connect        = filter_connect,
+        .o_connect_post   = filter_connect_post,
         .o_disconnect     = filter_disconnect,
         .o_statfs         = filter_statfs,
         .o_getattr        = filter_getattr,