Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index bdad15d..570dc00 100644 (file)
@@ -88,7 +88,10 @@ static void filter_commit_cb(struct obd_device *obd, __u64 transno,
                              void *cb_data, int error)
 {
         struct obd_export *exp = cb_data;
+        LASSERT(exp->exp_obd == obd);
         obd_transno_commit_cb(obd, transno, exp, error);
+        atomic_dec(&exp->exp_cb_count);
+        class_export_put(exp);
 }
 
 int filter_version_get_check(struct obd_export *exp,
@@ -164,6 +167,8 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
                        fed->fed_lr_idx, fed->fed_lr_off);
                 err = -EINVAL;
         } else {
+                class_export_get(exp); /* released when the cb is called */
+                atomic_inc(&exp->exp_cb_count);
                 if (!force_sync)
                         force_sync = fsfilt_add_journal_cb(exp->exp_obd,
                                                            last_rcvd,
@@ -213,7 +218,7 @@ static int lprocfs_init_rw_stats(struct obd_device *obd,
 
         num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) +
                                                         LPROC_FILTER_LAST - 1;
-        *stats = lprocfs_alloc_stats(num_stats, 0);
+        *stats = lprocfs_alloc_stats(num_stats, LPROCFS_STATS_FLAG_NOPERCPU);
         if (*stats == NULL)
                 return -ENOMEM;
 
@@ -272,8 +277,9 @@ static int filter_export_stats_init(struct obd_device *obd,
                 if (rc)
                         RETURN(rc);
                 /* Always add in ldlm_stats */
-                tmp->nid_ldlm_stats = lprocfs_alloc_stats(LDLM_LAST_OPC -
-                                                          LDLM_FIRST_OPC, 0);
+                tmp->nid_ldlm_stats = 
+                        lprocfs_alloc_stats(LDLM_LAST_OPC - LDLM_FIRST_OPC,
+                                            LPROCFS_STATS_FLAG_NOPERCPU);
                 if (tmp->nid_ldlm_stats == NULL)
                         return -ENOMEM;
 
@@ -432,7 +438,7 @@ static int filter_client_free(struct obd_export *exp)
         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
                 /* Don't force sync on disconnect if aborting recovery,
                  * or it does num_clients * num_osts.  b=17194 */
-                int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
+                int need_sync = exp->exp_need_sync &&
                                  !(exp->exp_flags&OBD_OPT_ABORT_RECOV);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_lcd,
@@ -1575,9 +1581,8 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
 }
 
 static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
-                                  obd_id group)
+                                  obd_id group, struct lustre_handle *lockh)
 {
-        struct lustre_handle lockh;
         int flags = LDLM_AST_DISCARD_DATA, rc;
         struct ldlm_res_id res_id;
         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
@@ -1589,15 +1594,19 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
                                     &policy, LCK_PW, &flags, ldlm_blocking_ast,
                                     ldlm_completion_ast, NULL, NULL, 0, NULL,
-                                    NULL, &lockh);
-
-        /* We only care about the side-effects, just drop the lock. */
-        if (rc == ELDLM_OK)
-                ldlm_lock_decref(&lockh, LCK_PW);
-
+                                    NULL, lockh);
+        if (rc != ELDLM_OK)
+                lockh->cookie = 0;
         RETURN(rc);
 }
 
+static void filter_fini_destroy(struct obd_device *obd,
+                                struct lustre_handle *lockh)
+{
+        if (lockh->cookie)
+                ldlm_lock_decref(lockh, LCK_PW);
+}
+
 /* This is vfs_unlink() without down(i_sem).  If we call regular vfs_unlink()
  * we have 2.6 lock ordering issues with filter_commitrw_write() as it takes
  * i_sem before starting a handle, while filter_destroy() + vfs_unlink do the
@@ -2089,11 +2098,6 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         init_mutex(&filter->fo_init_lock);
         filter->fo_committed_group = 0;
-
-        rc = filter_prep(obd);
-        if (rc)
-                GOTO(err_ops, rc);
-
         filter->fo_destroys_in_progress = 0;
         for (i = 0; i < 32; i++)
                 sema_init(&filter->fo_create_locks[i], 1);
@@ -2109,6 +2113,10 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
 
+        rc = filter_prep(obd);
+        if (rc)
+                GOTO(err_ops, rc);
+
         CFS_INIT_LIST_HEAD(&filter->fo_llog_list);
         spin_lock_init(&filter->fo_llog_list_lock);
 
@@ -2168,17 +2176,14 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         if (obd->obd_recovering) {
                 LCONSOLE_WARN("OST %s now serving %s (%s%s%s), but will be in "
                               "recovery for at least %d:%.02d, or until %d "
-                              "client%s reconnect. During this time new clients"
-                              " will not be allowed to connect. "
-                              "Recovery progress can be monitored by watching "
-                              "/proc/fs/lustre/obdfilter/%s/recovery_status.\n",
+                              "client%s reconnect%s.\n",
                               obd->obd_name, lustre_cfg_string(lcfg, 1),
                               label ?: "", label ? "/" : "", str,
                               obd->obd_recovery_timeout / 60,
                               obd->obd_recovery_timeout % 60,
                               obd->obd_max_recoverable_clients,
                               (obd->obd_max_recoverable_clients == 1) ? "":"s",
-                              obd->obd_name);
+                              (obd->obd_max_recoverable_clients == 1) ? "s":"");
         } else {
                 LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
@@ -2573,6 +2578,9 @@ static int filter_llog_connect(struct obd_export *exp,
               obd->obd_name, body->lgdc_logid.lgl_oid,
               body->lgdc_logid.lgl_ogr, body->lgdc_logid.lgl_ogen);
 
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        obd->u.filter.fo_mds_ost_sync = 1;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
         rc = llog_connect(ctxt, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
         llog_ctxt_put(ctxt);
@@ -2630,8 +2638,7 @@ static int filter_precleanup(struct obd_device *obd,
                 break;
         case OBD_CLEANUP_EXPORTS:
                 /* Stop recovery before namespace cleanup. */
-                target_stop_recovery_thread(obd);
-                target_cleanup_recovery(obd);
+                target_recovery_fini(obd);
                 rc = filter_llog_preclean(obd);
                 break;
         }
@@ -2647,14 +2654,8 @@ static int filter_cleanup(struct obd_device *obd)
                 LCONSOLE_WARN("%s: shutting down for failover; client state "
                               "will be preserved.\n", obd->obd_name);
 
-        if (!list_empty(&obd->obd_exports)) {
-                CERROR("%s: still has clients!\n", obd->obd_name);
-                class_disconnect_exports(obd);
-                if (!list_empty(&obd->obd_exports)) {
-                        CERROR("still has exports after forced cleanup?\n");
-                        RETURN(-EBUSY);
-                }
-        }
+        obd_exports_barrier(obd);
+        obd_zombie_barrier();
 
         lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
         lprocfs_free_per_client_stats(obd);
@@ -3554,7 +3555,7 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
 static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
                                      struct filter_obd *filter)
 {
-        struct obdo doa; /* XXX obdo on stack */
+        struct obdo doa = { 0 }; /* XXX obdo on stack */
         obd_id last, id;
         int rc = 0;
         int skip_orphan;
@@ -3581,7 +3582,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
 
         skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN);
 
-        CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"%s\n",
+        CDEBUG(D_HA, "%s: deleting orphan objects from "LPU64" to "LPU64"%s\n",
                exp->exp_obd->obd_name, oa->o_id + 1, last,
                skip_orphan ? ", orphan objids won't be reused any more." : ".");
 
@@ -3760,6 +3761,19 @@ static int filter_use_existing_obj(struct obd_device *obd,
         return rc;
 }
 
+static __u64 filter_calc_free_inodes(struct obd_device *obd)
+{
+        int rc;
+        __u64 os_ffree = -1;
+
+        spin_lock(&obd->obd_osfs_lock);
+        rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, cfs_time_shift_64(1));
+        if (rc == 0)
+                os_ffree = obd->obd_osfs.os_ffree;
+        spin_unlock(&obd->obd_osfs_lock);
+
+        return os_ffree;
+}
 
 /* We rely on the fact that only one thread will be creating files in a given
  * group at a time, which is why we don't need an atomic filter_get_new_id.
@@ -3780,6 +3794,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
         struct obd_statfs *osfs;
         int err = 0, rc = 0, recreate_obj = 0, i;
         cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT/2);
+        __u64 os_ffree;
         obd_id next_id;
         void *handle = NULL;
         ENTRY;
@@ -3892,9 +3907,19 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                                    S_IFREG |  S_ISUID | S_ISGID | 0666, NULL);
                 if (rc) {
                         CERROR("create failed rc = %d\n", rc);
+                        if (rc == -ENOSPC) {
+                                os_ffree = filter_calc_free_inodes(obd);
+                                if (os_ffree != -1)
+                                        CERROR("%s: free inode "LPU64"\n",
+                                               obd->obd_name, os_ffree);
+                        }
                         GOTO(cleanup, rc);
                 }
 
+                if (dchild->d_inode)
+                        CDEBUG(D_INFO, "objid "LPU64" got inum %lu\n", next_id,
+                                       dchild->d_inode->i_ino);
+
 set_last_id:
                 if (!recreate_obj) {
                         filter_set_last_id(filter, next_id, group);
@@ -4023,6 +4048,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
         struct obd_device *obd;
         struct filter_obd *filter;
         struct dentry *dchild = NULL, *dparent = NULL;
+        struct lustre_handle lockh = { 0 };
         struct lvfs_run_ctxt saved;
         void *handle = NULL;
         struct llog_cookie *fcc = NULL;
@@ -4075,7 +4101,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc = -ENOENT);
         }
 
-        filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
+        filter_prepare_destroy(obd, oa->o_id, oa->o_gr, &lockh);
 
         /* Our MDC connection is established by the MDS to us */
         if (oa->o_valid & OBD_MD_FLCOOKIE) {
@@ -4171,6 +4197,8 @@ cleanup:
         case 3:
                 filter_parent_unlock(dparent);
         case 2:
+                filter_fini_destroy(obd, &lockh);
+
                 f_dput(dchild);
                 if (fcc != NULL)
                         OBD_FREE(fcc, sizeof(*fcc));
@@ -4414,12 +4442,12 @@ static int filter_set_mds_conn(struct obd_export *exp, void *val)
         if (rc)
                 goto out;
 
-        lquota_setinfo(filter_quota_interface_ref, obd, exp);
-
         if (group == FILTER_GROUP_MDS0) {
                 /* setup llog group 1 for interop */
                 filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG);
         }
+
+        lquota_setinfo(filter_quota_interface_ref, obd, exp);
 out:
         RETURN(rc);
 }
@@ -4474,7 +4502,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
 
         switch (cmd) {
         case OBD_IOC_ABORT_RECOVERY: {
-                CERROR("aborting recovery for device %s\n", obd->obd_name);
+                LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
                 target_stop_recovery_thread(obd);
                 RETURN(0);
         }