Whamcloud - gitweb
LU-2057 ofd: implement jobstats and few fixes
authorMikhail Pershin <tappro@whamcloud.com>
Mon, 1 Oct 2012 10:42:30 +0000 (14:42 +0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 2 Oct 2012 03:50:19 +0000 (23:50 -0400)
- implement jobstats for OFD like in obdfilter
- fix ofd_health_check to ignore external lu_env and use local one
  always, because obd_proc_read_health uses it without env
- do object sync only if its version is not yet committed

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Idf74ef280f9d71e3db6b45cfacb2f5303b302a77
Reviewed-on: http://review.whamcloud.com/4143
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_obd.c

index 3fbfd1e..c40774b 100644 (file)
@@ -464,6 +464,8 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "capa",                lprocfs_ofd_rd_capa,
                                 lprocfs_ofd_wr_capa, 0 },
        { "capa_count",          lprocfs_ofd_rd_capa_count, 0, 0 },
        { "capa",                lprocfs_ofd_rd_capa,
                                 lprocfs_ofd_wr_capa, 0 },
        { "capa_count",          lprocfs_ofd_rd_capa_count, 0, 0 },
+       { "job_cleanup_interval", lprocfs_rd_job_interval,
+                                 lprocfs_wr_job_interval, 0},
        { 0 }
 };
 
        { 0 }
 };
 
@@ -618,4 +620,19 @@ static ssize_t ofd_per_nid_stats_seq_write(struct file *file, const char *buf,
 }
 
 LPROC_SEQ_FOPS(ofd_per_nid_stats);
 }
 
 LPROC_SEQ_FOPS(ofd_per_nid_stats);
+
+void ofd_stats_counter_init(struct lprocfs_stats *stats)
+{
+       LASSERT(stats && stats->ls_num == LPROC_OFD_STATS_LAST);
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_READ,
+                            LPROCFS_CNTR_AVGMINMAX, "read", "bytes");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_WRITE,
+                            LPROCFS_CNTR_AVGMINMAX, "write", "bytes");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_SETATTR,
+                            0, "setattr", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_PUNCH,
+                            0, "punch", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_SYNC,
+                            0, "sync", "reqs");
+}
 #endif /* LPROCFS */
 #endif /* LPROCFS */
index 7b2710e..a637f03 100644 (file)
@@ -389,8 +389,14 @@ static int ofd_procfs_init(struct ofd_device *ofd)
                       obd->obd_name, rc);
                GOTO(free_obd_stats, rc);
        }
                       obd->obd_name, rc);
                GOTO(free_obd_stats, rc);
        }
-       RETURN(0);
 
 
+       rc = lprocfs_job_stats_init(obd, LPROC_OFD_STATS_LAST,
+                                   ofd_stats_counter_init);
+       if (rc)
+               GOTO(remove_entry_clear, rc);
+       RETURN(0);
+remove_entry_clear:
+       lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
 free_obd_stats:
        lprocfs_free_obd_stats(obd);
 obd_cleanup:
 free_obd_stats:
        lprocfs_free_obd_stats(obd);
 obd_cleanup:
@@ -402,6 +408,7 @@ static int ofd_procfs_fini(struct ofd_device *ofd)
 {
        struct obd_device *obd = ofd_obd(ofd);
 
 {
        struct obd_device *obd = ofd_obd(ofd);
 
+       lprocfs_job_stats_fini(obd);
        lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
        lprocfs_free_per_client_stats(obd);
        lprocfs_free_obd_stats(obd);
        lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
        lprocfs_free_per_client_stats(obd);
        lprocfs_free_obd_stats(obd);
index 2aca2d2..5ab1e61 100644 (file)
@@ -72,6 +72,24 @@ enum {
        LPROC_OFD_LAST,
 };
 
        LPROC_OFD_LAST,
 };
 
+/* for job stats */
+enum {
+       LPROC_OFD_STATS_READ = 0,
+       LPROC_OFD_STATS_WRITE = 1,
+       LPROC_OFD_STATS_SETATTR = 2,
+       LPROC_OFD_STATS_PUNCH = 3,
+       LPROC_OFD_STATS_SYNC = 4,
+       LPROC_OFD_STATS_LAST,
+};
+
+static inline void ofd_counter_incr(struct obd_export *exp, int opcode,
+                                   char *jobid, long amount)
+{
+       if (exp->exp_obd && exp->exp_obd->u.obt.obt_jobstats.ojs_hash &&
+           (exp->exp_connect_flags & OBD_CONNECT_JOBSTATS))
+               lprocfs_job_stats_log(exp->exp_obd, jobid, opcode, amount);
+}
+
 struct ofd_device {
        struct dt_device         ofd_dt_dev;
        struct dt_device        *ofd_osd;
 struct ofd_device {
        struct dt_device         ofd_dt_dev;
        struct dt_device        *ofd_osd;
@@ -329,9 +347,19 @@ int ofd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                    void *cookie);
 
 /* lproc_ofd.c */
                    void *cookie);
 
 /* lproc_ofd.c */
+#ifdef LPROCFS
 void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars);
 int lproc_ofd_attach_seqstat(struct obd_device *dev);
 extern struct file_operations ofd_per_nid_stats_fops;
 void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars);
 int lproc_ofd_attach_seqstat(struct obd_device *dev);
 extern struct file_operations ofd_per_nid_stats_fops;
+void ofd_stats_counter_init(struct lprocfs_stats *stats);
+#else
+static void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars)
+{
+       memset(lvars, 0, sizeof(*lvars));
+}
+static inline int lproc_ofd_attach_seqstat(struct obd_device *dev) {}
+static inline void ofd_stats_counter_init(struct lprocfs_stats *stats) {}
+#endif
 
 /* ofd_objects.c */
 struct ofd_object *ofd_object_find(const struct lu_env *env,
 
 /* ofd_objects.c */
 struct ofd_object *ofd_object_find(const struct lu_env *env,
index 320deab..1193547 100644 (file)
 
 #include "ofd_internal.h"
 
 
 #include "ofd_internal.h"
 
-static int ofd_preprw_read(const struct lu_env *env, struct ofd_device *ofd,
-                          struct lu_fid *fid, struct lu_attr *la, int niocount,
+static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
+                          struct ofd_device *ofd, struct lu_fid *fid,
+                          struct lu_attr *la, int niocount,
                           struct niobuf_remote *rnb, int *nr_local,
                           struct niobuf_remote *rnb, int *nr_local,
-                          struct niobuf_local *lnb)
+                          struct niobuf_local *lnb,
+                          struct obd_trans_info *oti)
 {
        struct ofd_object       *fo;
        int                      i, j, rc, tot_bytes = 0;
 {
        struct ofd_object       *fo;
        int                      i, j, rc, tot_bytes = 0;
@@ -86,6 +88,8 @@ static int ofd_preprw_read(const struct lu_env *env, struct ofd_device *ofd,
                GOTO(buf_put, rc);
        lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
                            LPROC_OFD_READ_BYTES, tot_bytes);
                GOTO(buf_put, rc);
        lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
                            LPROC_OFD_READ_BYTES, tot_bytes);
+       ofd_counter_incr(exp, LPROC_OFD_STATS_READ,
+                        oti->oti_jobid, tot_bytes);
        RETURN(0);
 
 buf_put:
        RETURN(0);
 
 buf_put:
@@ -178,23 +182,26 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        *nr_local = j;
        LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
 
        *nr_local = j;
        LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
 
-       lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
-                           LPROC_OFD_WRITE_BYTES, tot_bytes);
        rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc != 0)) {
                dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
                ofd_read_unlock(env, fo);
                /* ofd_grant_prepare_write() was called, so we must commit */
                ofd_grant_commit(env, exp, rc);
        rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc != 0)) {
                dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
                ofd_read_unlock(env, fo);
                /* ofd_grant_prepare_write() was called, so we must commit */
                ofd_grant_commit(env, exp, rc);
+               GOTO(out, rc);
        }
 
        }
 
-       RETURN(rc);
+       lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
+                           LPROC_OFD_WRITE_BYTES, tot_bytes);
+       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE,
+                        oti->oti_jobid, tot_bytes);
+       RETURN(0);
 out:
        /* let's still process incoming grant information packed in the oa,
         * but without enforcing grant since we won't proceed with the write.
         * Just like a read request actually. */
        ofd_grant_prepare_read(env, exp, oa);
 out:
        /* let's still process incoming grant information packed in the oa,
         * but without enforcing grant since we won't proceed with the write.
         * Just like a read request actually. */
        ofd_grant_prepare_read(env, exp, oa);
-       RETURN(rc);
+       return rc;
 }
 
 int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
 }
 
 int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
@@ -236,9 +243,9 @@ int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
                                   capa, CAPA_OPC_OSS_READ);
                if (rc == 0) {
                        ofd_grant_prepare_read(env, exp, oa);
                                   capa, CAPA_OPC_OSS_READ);
                if (rc == 0) {
                        ofd_grant_prepare_read(env, exp, oa);
-                       rc = ofd_preprw_read(env, ofd, &info->fti_fid,
+                       rc = ofd_preprw_read(env, exp, ofd, &info->fti_fid,
                                             &info->fti_attr, obj->ioo_bufcnt,
                                             &info->fti_attr, obj->ioo_bufcnt,
-                                            rnb, nr_local, lnb);
+                                            rnb, nr_local, lnb, oti);
                        obdo_from_la(oa, &info->fti_attr, LA_ATIME);
                }
        } else {
                        obdo_from_la(oa, &info->fti_attr, LA_ATIME);
                }
        } else {
index 820cfff..5efa4a4 100644 (file)
@@ -835,10 +835,13 @@ int ofd_setattr(const struct lu_env *env, struct obd_export *exp,
        obdo_from_la(oinfo->oi_oa, &info->fti_attr,
                     OFD_VALID_FLAGS | LA_UID | LA_GID);
        ofd_info2oti(info, oti);
        obdo_from_la(oinfo->oi_oa, &info->fti_attr,
                     OFD_VALID_FLAGS | LA_UID | LA_GID);
        ofd_info2oti(info, oti);
+
+       ofd_counter_incr(exp, LPROC_OFD_STATS_SETATTR, oti->oti_jobid, 1);
+       EXIT;
 out_unlock:
        ofd_object_put(env, fo);
 out:
 out_unlock:
        ofd_object_put(env, fo);
 out:
-       RETURN(rc);
+       return rc;
 }
 
 static int ofd_punch(const struct lu_env *env, struct obd_export *exp,
 }
 
 static int ofd_punch(const struct lu_env *env, struct obd_export *exp,
@@ -916,10 +919,13 @@ static int ofd_punch(const struct lu_env *env, struct obd_export *exp,
        obdo_from_la(oinfo->oi_oa, &info->fti_attr,
                     OFD_VALID_FLAGS | LA_UID | LA_GID);
        ofd_info2oti(info, oti);
        obdo_from_la(oinfo->oi_oa, &info->fti_attr,
                     OFD_VALID_FLAGS | LA_UID | LA_GID);
        ofd_info2oti(info, oti);
+
+       ofd_counter_incr(exp, LPROC_OFD_STATS_PUNCH, oti->oti_jobid, 1);
+       EXIT;
 out:
        ofd_object_put(env, fo);
 out_env:
 out:
        ofd_object_put(env, fo);
 out_env:
-       RETURN(rc);
+       return rc;
 }
 
 static int ofd_destroy_by_fid(const struct lu_env *env,
 }
 
 static int ofd_destroy_by_fid(const struct lu_env *env,
@@ -1103,10 +1109,10 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                    oa->o_id > ofd_last_id(ofd, oa->o_seq)) {
                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
                                        oa->o_id, ofd_last_id(ofd, oa->o_seq));
                    oa->o_id > ofd_last_id(ofd, oa->o_seq)) {
                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
                                        oa->o_id, ofd_last_id(ofd, oa->o_seq));
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_nolock, rc = -EINVAL);
                }
                /* do nothing because we create objects during first write */
                }
                /* do nothing because we create objects during first write */
-               GOTO(out, rc = 0);
+               GOTO(out_nolock, rc = 0);
        }
        /* former ofd_handle_precreate */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
        }
        /* former ofd_handle_precreate */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
@@ -1115,7 +1121,7 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
                               ofd_obd(ofd)->obd_name);
                if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
                               ofd_obd(ofd)->obd_name);
-                       GOTO(out, rc = 0);
+                       GOTO(out_nolock, rc = 0);
                }
                /* This causes inflight precreates to abort and drop lock */
                cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
                }
                /* This causes inflight precreates to abort and drop lock */
                cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress);
@@ -1200,6 +1206,7 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
        ofd_info2oti(info, oti);
 out:
        cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]);
        ofd_info2oti(info, oti);
 out:
        cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]);
+out_nolock:
        if (rc == 0 && ea != NULL) {
                struct lov_stripe_md *lsm = *ea;
 
        if (rc == 0 && ea != NULL) {
                struct lov_stripe_md *lsm = *ea;
 
@@ -1285,13 +1292,18 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp,
        if (!ofd_object_exists(fo))
                GOTO(unlock, rc = -ENOENT);
 
        if (!ofd_object_exists(fo))
                GOTO(unlock, rc = -ENOENT);
 
-       rc = dt_object_sync(env, ofd_object_child(fo));
-       if (rc)
-               GOTO(unlock, rc);
+       if (dt_version_get(env, ofd_object_child(fo)) >
+           ofd_obd(ofd)->obd_last_committed) {
+               rc = dt_object_sync(env, ofd_object_child(fo));
+               if (rc)
+                       GOTO(unlock, rc);
+       }
 
        oinfo->oi_oa->o_valid = OBD_MD_FLID;
        rc = ofd_attr_get(env, fo, &info->fti_attr);
        obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS);
 
        oinfo->oi_oa->o_valid = OBD_MD_FLID;
        rc = ofd_attr_get(env, fo, &info->fti_attr);
        obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS);
+
+       ofd_counter_incr(exp, LPROC_OFD_STATS_SYNC, oinfo->oi_jobid, 1);
        EXIT;
 unlock:
        ofd_write_unlock(env, fo);
        EXIT;
 unlock:
        ofd_write_unlock(env, fo);
@@ -1360,17 +1372,23 @@ static int ofd_ping(const struct lu_env *env, struct obd_export *exp)
        return 0;
 }
 
        return 0;
 }
 
-static int ofd_health_check(const struct lu_env *env, struct obd_device *obd)
+static int ofd_health_check(const struct lu_env *nul, struct obd_device *obd)
 {
        struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
        struct ofd_thread_info  *info;
 {
        struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
        struct ofd_thread_info  *info;
+       struct lu_env            env;
 #ifdef USE_HEALTH_CHECK_WRITE
        struct thandle          *th;
 #endif
        int                      rc = 0;
 
 #ifdef USE_HEALTH_CHECK_WRITE
        struct thandle          *th;
 #endif
        int                      rc = 0;
 
-       info = ofd_info_init(env, NULL);
-       rc = dt_statfs(env, ofd->ofd_osd, &info->fti_u.osfs);
+       /* obd_proc_read_health pass NULL env, we need real one */
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+
+       info = ofd_info_init(&env, NULL);
+       rc = dt_statfs(&env, ofd->ofd_osd, &info->fti_u.osfs);
        if (unlikely(rc))
                GOTO(out, rc);
 
        if (unlikely(rc))
                GOTO(out, rc);
 
@@ -1385,27 +1403,28 @@ static int ofd_health_check(const struct lu_env *env, struct obd_device *obd)
        info->fti_buf.lb_len = CFS_PAGE_SIZE;
        info->fti_off = 0;
 
        info->fti_buf.lb_len = CFS_PAGE_SIZE;
        info->fti_off = 0;
 
-       th = dt_trans_create(env, ofd->ofd_osd);
+       th = dt_trans_create(&env, ofd->ofd_osd);
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, ofd->ofd_health_check_file,
+       rc = dt_declare_record_write(&env, ofd->ofd_health_check_file,
                                     info->fti_buf.lb_len, info->fti_off, th);
        if (rc == 0) {
                th->th_sync = 1; /* sync IO is needed */
                                     info->fti_buf.lb_len, info->fti_off, th);
        if (rc == 0) {
                th->th_sync = 1; /* sync IO is needed */
-               rc = dt_trans_start_local(env, ofd->ofd_osd, th);
+               rc = dt_trans_start_local(&env, ofd->ofd_osd, th);
                if (rc == 0)
                if (rc == 0)
-                       rc = dt_record_write(env, ofd->ofd_health_check_file,
+                       rc = dt_record_write(&env, ofd->ofd_health_check_file,
                                             &info->fti_buf, &info->fti_off,
                                             th);
        }
                                             &info->fti_buf, &info->fti_off,
                                             th);
        }
-       dt_trans_stop(env, ofd->ofd_osd, th);
+       dt_trans_stop(&env, ofd->ofd_osd, th);
 
        OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
 
        CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc);
 #endif
 out:
 
        OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE);
 
        CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc);
 #endif
 out:
+       lu_env_fini(&env);
        return !!rc;
 }
 
        return !!rc;
 }