Whamcloud - gitweb
LU-694 ptlrpc: Job Stats
authorNiu Yawei <niu@whamcloud.com>
Mon, 24 Oct 2011 15:14:09 +0000 (08:14 -0700)
committerOleg Drokin <green@whamcloud.com>
Wed, 30 May 2012 09:57:17 +0000 (05:57 -0400)
This feature is to collect filesystem operation stats for the jobs
running on Lustre.

When some job scheduler (SLURM, for instance) is running on lustre
client, the lustre client will pack the job id into each request
(open, unlink, write...), and server will collect those information
then expose them via procfs.

- A 'pb_jobid' is added in 'ptlrpc_body' for storing the jobID;
- The job stats can be accessed from the proc file 'job_stats' in
  each mdt & obdfilter proc entries;
- 'job_cleanup_interval' is created in each mdt & obdfilter proc
  entries for tuning stats cleanup interval;
- 'jobid_var' is created in llite proc entry for configuring the
  jobID environment variable name;

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I35c21f93d7ce1ce648504ce437fcfd374f891453
Reviewed-on: http://review.whamcloud.com/1397
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
39 files changed:
libcfs/include/libcfs/curproc.h
libcfs/libcfs/linux/linux-curproc.c
lustre/autoconf/lustre-core.m4
lustre/include/cl_object.h
lustre/include/lprocfs_status.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/lclient/lcommon_cl.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/vvp_io.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/obdclass/Makefile.in
lustre/obdclass/autoMakefile.am
lustre/obdclass/class_obd.c
lustre/obdclass/lprocfs_jobstats.c [new file with mode: 0644]
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/wiretest.c
lustre/tests/sanity.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 777e424..d5530fc 100644 (file)
@@ -73,6 +73,7 @@ char  *cfs_curproc_comm(void);
 
 /* check if task is running in compat mode.*/
 int cfs_curproc_is_32bit(void);
+int cfs_get_environ(const char *key, char *value, int *val_len);
 #endif
 
 typedef __u32 cfs_cap_t;
index dd44fbe..f675eac 100644 (file)
@@ -213,6 +213,158 @@ int cfs_curproc_is_32bit(void)
 #endif
 }
 
+static int cfs_access_process_vm(struct task_struct *tsk, unsigned long addr,
+                                void *buf, int len, int write)
+{
+#ifdef HAVE_ACCESS_PROCESS_VM
+       return access_process_vm(tsk, addr, buf, len, write);
+#else
+       /* Just copied from kernel for the kernels which doesn't
+        * have access_process_vm() exported */
+       struct mm_struct *mm;
+       struct vm_area_struct *vma;
+       struct page *page;
+       void *old_buf = buf;
+
+       mm = get_task_mm(tsk);
+       if (!mm)
+               return 0;
+
+       down_read(&mm->mmap_sem);
+       /* ignore errors, just check how much was sucessfully transfered */
+       while (len) {
+               int bytes, ret, offset;
+               void *maddr;
+
+               ret = get_user_pages(tsk, mm, addr, 1,
+                                    write, 1, &page, &vma);
+               if (ret <= 0)
+                       break;
+
+               bytes = len;
+               offset = addr & (PAGE_SIZE-1);
+               if (bytes > PAGE_SIZE-offset)
+                       bytes = PAGE_SIZE-offset;
+
+               maddr = kmap(page);
+               if (write) {
+                       copy_to_user_page(vma, page, addr,
+                                         maddr + offset, buf, bytes);
+                       set_page_dirty_lock(page);
+               } else {
+                       copy_from_user_page(vma, page, addr,
+                                           buf, maddr + offset, bytes);
+               }
+               kunmap(page);
+               page_cache_release(page);
+               len -= bytes;
+               buf += bytes;
+               addr += bytes;
+       }
+       up_read(&mm->mmap_sem);
+       mmput(mm);
+
+       return buf - old_buf;
+#endif /* HAVE_ACCESS_PROCESS_VM */
+}
+
+/* Read the environment variable of current process specified by @key. */
+int cfs_get_environ(const char *key, char *value, int *val_len)
+{
+       struct mm_struct *mm;
+       char *buffer, *tmp_buf = NULL;
+       int buf_len = CFS_PAGE_SIZE;
+       unsigned long addr;
+       int ret;
+       ENTRY;
+
+       buffer = (char *)cfs_alloc(buf_len, CFS_ALLOC_USER);
+       if (!buffer)
+               RETURN(-ENOMEM);
+
+       mm = get_task_mm(current);
+       if (!mm) {
+               cfs_free((void *)buffer);
+               RETURN(-EINVAL);
+       }
+
+       addr = mm->env_start;
+       ret = -ENOENT;
+
+       while (addr < mm->env_end) {
+               int this_len, retval, scan_len;
+               char *env_start, *env_end;
+
+               memset(buffer, 0, buf_len);
+
+               this_len = min((int)(mm->env_end - addr), buf_len);
+               retval = cfs_access_process_vm(current, addr, buffer,
+                                              this_len, 0);
+               if (retval != this_len)
+                       break;
+
+               addr += retval;
+
+               /* Parse the buffer to find out the specified key/value pair.
+                * The "key=value" entries are separated by '\0'. */
+               env_start = buffer;
+               scan_len = this_len;
+               while (scan_len) {
+                       char *entry;
+                       int entry_len;
+
+                       env_end = (char *)memscan(env_start, '\0', scan_len);
+                       LASSERT(env_end >= env_start &&
+                               env_end <= env_start + scan_len);
+
+                       /* The last entry of this buffer cross the buffer
+                        * boundary, reread it in next cycle. */
+                       if (unlikely(env_end - env_start == scan_len)) {
+                               /* This entry is too large to fit in buffer */
+                               if (unlikely(scan_len == this_len)) {
+                                       CERROR("Too long env variable.\n");
+                                       ret = -EINVAL;
+                                       goto out;
+                               }
+                               addr -= scan_len;
+                               break;
+                       }
+
+                       entry = env_start;
+                       entry_len = env_end - env_start;
+
+                       /* Key length + length of '=' */
+                       if (entry_len > strlen(key) + 1 &&
+                           !memcmp(entry, key, strlen(key))) {
+                               entry += (strlen(key) + 1);
+                               entry_len -= (strlen(key) + 1);
+                               /* The 'value' buffer passed in is too small.*/
+                               if (entry_len >= *val_len) {
+                                       CERROR("Buffer is too small. "
+                                              "entry_len=%d buffer_len=%d\n",
+                                              entry_len, *val_len);
+                                       ret = -EOVERFLOW;
+                               } else {
+                                       memcpy(value, entry, entry_len);
+                                       *val_len = entry_len;
+                                       ret = 0;
+                               }
+                               goto out;
+                       }
+
+                       scan_len -= (env_end - env_start + 1);
+                       env_start = env_end + 1;
+               }
+       }
+out:
+       mmput(mm);
+       cfs_free((void *)buffer);
+       if (tmp_buf)
+               cfs_free((void *)tmp_buf);
+       RETURN(ret);
+}
+EXPORT_SYMBOL(cfs_get_environ);
+
 EXPORT_SYMBOL(cfs_curproc_uid);
 EXPORT_SYMBOL(cfs_curproc_pid);
 EXPORT_SYMBOL(cfs_curproc_euid);
index 11b582a..31b0fd6 100644 (file)
@@ -1796,6 +1796,16 @@ AC_DEFUN([LC_EXPORT_GENERIC_ERROR_REMOVE_PAGE],
          ]
 )
 
+# 2.6.32 if kernel export access_process_vm().
+AC_DEFUN([LC_EXPORT_ACCESS_PROCESS_VM],
+        [LB_CHECK_SYMBOL_EXPORT([access_process_vm],
+                        [mm/memory.c],
+                        [AC_DEFINE(HAVE_ACCESS_PROCESS_VM, 1,
+                                [access_process_vm function is present])],
+                        [])
+        ]
+)
+
 #
 # 2.6.36 fs_struct.lock use spinlock instead of rwlock.
 #
@@ -2126,6 +2136,7 @@ AC_DEFUN([LC_PROG_LINUX],
          LC_CACHE_UPCALL
          LC_EXPORT_GENERIC_ERROR_REMOVE_PAGE
          LC_SELINUX_IS_ENABLED
+         LC_EXPORT_ACCESS_PROCESS_VM
 
          # 2.6.35, 3.0.0
          LC_FILE_FSYNC
index 59df479..b88655b 100644 (file)
@@ -2378,10 +2378,12 @@ struct cl_io {
  * Per-transfer attributes.
  */
 struct cl_req_attr {
-        /** Generic attributes for the server consumption. */
-        struct obdo     *cra_oa;
-        /** Capability. */
-        struct obd_capa *cra_capa;
+       /** Generic attributes for the server consumption. */
+       struct obdo     *cra_oa;
+       /** Capability. */
+       struct obd_capa *cra_capa;
+       /** Jobid */
+       char             cra_jobid[JOBSTATS_JOBID_SIZE];
 };
 
 /**
index b6493ca..97336f8 100644 (file)
@@ -350,6 +350,21 @@ static inline void s2dhms(struct dhms *ts, time_t secs)
 #define DHMS_FMT "%dd%dh%02dm%02ds"
 #define DHMS_VARS(x) (x)->d, (x)->h, (x)->m, (x)->s
 
+#define JOBSTATS_JOBID_VAR_MAX_LEN     20
+#define JOBSTATS_DISABLE               "disable"
+#define JOBSTATS_PROCNAME_UID          "procname_uid"
+
+typedef void (*cntr_init_callback)(struct lprocfs_stats *stats);
+
+struct obd_job_stats {
+       cfs_hash_t        *ojs_hash;
+       cfs_list_t         ojs_list;
+       cfs_rwlock_t       ojs_lock; /* protect the obj_list */
+       int                ojs_cntr_num;
+       cntr_init_callback ojs_cntr_init_fn;
+       cfs_timer_t        ojs_cleanup_timer;
+       int                ojs_cleanup_interval;
+};
 
 #ifdef LPROCFS
 
@@ -649,6 +664,17 @@ struct file_operations name##_fops = {                                     \
 #define LPROC_SEQ_FOPS_RO(name)         __LPROC_SEQ_FOPS(name, NULL)
 #define LPROC_SEQ_FOPS(name)            __LPROC_SEQ_FOPS(name, name##_seq_write)
 
+/* lprocfs_jobstats.c */
+int lprocfs_job_stats_log(struct obd_device *obd, char *jobid,
+                         int event, long amount);
+void lprocfs_job_stats_fini(struct obd_device *obd);
+int lprocfs_job_stats_init(struct obd_device *obd, int cntr_num,
+                          cntr_init_callback fn);
+int lprocfs_rd_job_interval(char *page, char **start, off_t off,
+                           int count, int *eof, void *data);
+int lprocfs_wr_job_interval(struct file *file, const char *buffer,
+                           unsigned long count, void *data);
+
 /* lproc_ptlrpc.c */
 struct ptlrpc_request;
 extern void target_print_req(void *seq_file, struct ptlrpc_request *req);
@@ -952,6 +978,20 @@ __u64 lprocfs_stats_collector(struct lprocfs_stats *stats, int idx,
 #define LPROC_SEQ_FOPS_RO(name)
 #define LPROC_SEQ_FOPS(name)
 
+/* lprocfs_jobstats.c */
+static inline
+int lprocfs_job_stats_log(struct obd_device *obd, char *jobid, int event,
+                         long amount)
+{ return 0; }
+static inline
+void lprocfs_job_stats_fini(struct obd_device *obd)
+{ return; }
+static inline
+int lprocfs_job_stats_init(struct obd_device *obd, int cntr_num,
+                          cntr_init_callback fn)
+{ return 0; }
+
+
 /* lproc_ptlrpc.c */
 #define target_print_req NULL
 
index fa09ec4..aa15515 100644 (file)
@@ -994,7 +994,33 @@ struct lustre_msg_v2 {
 
 /* without gss, ptlrpc_body is put at the first buffer. */
 #define PTLRPC_NUM_VERSIONS     4
-struct ptlrpc_body {
+#define JOBSTATS_JOBID_SIZE     32  /* 32 bytes string */
+struct ptlrpc_body_v3 {
+       struct lustre_handle pb_handle;
+       __u32 pb_type;
+       __u32 pb_version;
+       __u32 pb_opc;
+       __u32 pb_status;
+       __u64 pb_last_xid;
+       __u64 pb_last_seen;
+       __u64 pb_last_committed;
+       __u64 pb_transno;
+       __u32 pb_flags;
+       __u32 pb_op_flags;
+       __u32 pb_conn_cnt;
+       __u32 pb_timeout;  /* for req, the deadline, for rep, the service est */
+       __u32 pb_service_time; /* for rep, actual service time */
+       __u32 pb_limit;
+       __u64 pb_slv;
+       /* VBR: pre-versions */
+       __u64 pb_pre_versions[PTLRPC_NUM_VERSIONS];
+       /* padding for future needs */
+       __u64 pb_padding[4];
+       char  pb_jobid[JOBSTATS_JOBID_SIZE];
+};
+#define ptlrpc_body     ptlrpc_body_v3
+
+struct ptlrpc_body_v2 {
         struct lustre_handle pb_handle;
         __u32 pb_type;
         __u32 pb_version;
@@ -1159,7 +1185,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 | \
                                 OBD_CONNECT_SOM | OBD_CONNECT_FULL20 | \
                                 OBD_CONNECT_64BITHASH | \
-                                OBD_CONNECT_EINPROGRESS)
+                               OBD_CONNECT_EINPROGRESS | OBD_CONNECT_JOBSTATS)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
@@ -1174,7 +1200,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_GRANT_SHRINK | OBD_CONNECT_FULL20 | \
                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_MAXBYTES | \
                                 OBD_CONNECT_MAX_EASIZE | \
-                                OBD_CONNECT_EINPROGRESS)
+                               OBD_CONNECT_EINPROGRESS | OBD_CONNECT_JOBSTATS)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \
                                 OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV)
index 3cecb0a..36106fe 100644 (file)
@@ -1690,6 +1690,7 @@ int lustre_msg_is_v1(struct lustre_msg *msg);
 __u32 lustre_msg_get_magic(struct lustre_msg *msg);
 __u32 lustre_msg_get_timeout(struct lustre_msg *msg);
 __u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+char *lustre_msg_get_jobid(struct lustre_msg *msg);
 __u32 lustre_msg_get_cksum(struct lustre_msg *msg);
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 9, 0, 0)
 __u32 lustre_msg_calc_cksum(struct lustre_msg *msg, int compat18);
@@ -1710,6 +1711,7 @@ void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, __u32 *sizes)
 void ptlrpc_request_set_replen(struct ptlrpc_request *req);
 void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
 void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
 void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
 
 static inline void
index 3cf3115..da354e2 100644 (file)
@@ -193,6 +193,8 @@ struct obd_info {
         /* oss capability, its type is obd_capa in client to avoid copy.
          * in contrary its type is lustre_capa in OSS. */
         void                   *oi_capa;
+       /* transfer jobid from ost_sync() to filter_sync()... */
+       char                   *oi_jobid;
 };
 
 /* compare all relevant fields. */
@@ -304,7 +306,8 @@ struct obd_device_target {
         cfs_rw_semaphore_t        obt_rwsem;
         struct vfsmount          *obt_vfsmnt;
         struct file              *obt_health_check_filp;
-        struct osd_properties    obt_osd_properties;
+       struct osd_properties     obt_osd_properties;
+       struct obd_job_stats      obt_jobstats;
 };
 
 /* llog contexts */
@@ -889,6 +892,8 @@ struct obd_trans_info {
         __u32                    oti_conn_cnt;
         /** VBR: versions */
         __u64                    oti_pre_version;
+       /** JobID */
+       char                    *oti_jobid;
 
         struct obd_uuid         *oti_ost_uuid;
 };
index 87a961b..a9f86f8 100644 (file)
@@ -79,6 +79,7 @@ extern cfs_rwlock_t obd_dev_lock;
 extern struct obd_device *class_conn2obd(struct lustre_handle *);
 extern struct obd_device *class_exp2obd(struct obd_export *);
 extern int class_handle_ioctl(unsigned int cmd, unsigned long arg);
+extern int lustre_get_jobid(char *jobid);
 
 struct lu_device_type;
 
index 4243085..39b4728 100644 (file)
@@ -76,6 +76,7 @@ extern unsigned int obd_max_dirty_pages;
 extern cfs_atomic_t obd_dirty_pages;
 extern cfs_atomic_t obd_dirty_transit_pages;
 extern unsigned int obd_alloc_fail_rate;
+extern char obd_jobid_var[];
 
 /* lvfs.c */
 int obd_alloc_fail(const void *ptr, const char *name, const char *type,
@@ -105,6 +106,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define HASH_EXP_LOCK_MAX_BITS  16
 #define HASH_CL_ENV_BKT_BITS    5
 #define HASH_CL_ENV_BITS        10
+#define HASH_JOB_STATS_BKT_BITS 5
+#define HASH_JOB_STATS_CUR_BITS 7
+#define HASH_JOB_STATS_MAX_BITS 12
 
 /* Timeout definitions */
 #define OBD_TIMEOUT_DEFAULT             100
index 462d6e0..05c348d 100644 (file)
@@ -1000,6 +1000,10 @@ void ccc_req_attr_set(const struct lu_env *env,
         }
         obdo_from_inode(oa, inode, valid_flags & flags);
         obdo_set_parent_fid(oa, &cl_i2info(inode)->lli_fid);
+#ifdef __KERNEL__
+       memcpy(attr->cra_jobid, cl_i2info(inode)->lli_jobid,
+              JOBSTATS_JOBID_SIZE);
+#endif
 }
 
 const struct cl_req_operations ccc_req_ops = {
index c4e8f54..0932b3c 100644 (file)
@@ -227,6 +227,15 @@ struct ll_inode_info {
                         cfs_time_t                      f_glimpse_time;
                         cfs_list_t                      f_agl_list;
                         __u64                           f_agl_index;
+                       /*
+                        * whenever a process try to read/write the file, the
+                        * jobid of the process will be saved here, and it'll
+                        * be packed into the write PRC when flush later.
+                        *
+                        * so the read/write statistics for jobid will not be
+                        * accurate if the file is shared by different jobs.
+                        */
+                       char                     f_jobid[JOBSTATS_JOBID_SIZE];
                 } f;
 
 #define lli_size_sem            u.f.f_size_sem
@@ -241,6 +250,7 @@ struct ll_inode_info {
 #define lli_glimpse_time        u.f.f_glimpse_time
 #define lli_agl_list            u.f.f_agl_list
 #define lli_agl_index           u.f.f_agl_index
+#define lli_jobid               u.f.f_jobid
 
         } u;
 
index bae0dc2..958cf24 100644 (file)
@@ -214,7 +214,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
                                   OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR    |
                                   OBD_CONNECT_FULL20   | OBD_CONNECT_64BITHASH|
-                                  OBD_CONNECT_EINPROGRESS;
+                                 OBD_CONNECT_EINPROGRESS |
+                                 OBD_CONNECT_JOBSTATS;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
@@ -385,7 +386,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_OSS_CAPA | OBD_CONNECT_VBR|
                                   OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH |
                                   OBD_CONNECT_MAXBYTES |
-                                  OBD_CONNECT_EINPROGRESS;
+                                 OBD_CONNECT_EINPROGRESS |
+                                 OBD_CONNECT_JOBSTATS;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
index b3a7fe1..f6a5fb8 100644 (file)
@@ -650,6 +650,24 @@ static int ll_rd_maxea_size(char *page, char **start, off_t off,
         return snprintf(page, count, "%u\n", ealen);
 }
 
+static int ll_rd_jobidvar(char *page, char **start, off_t off,
+                         int count, int *eof, void *data)
+{
+       return snprintf(page, count, "%s\n", obd_jobid_var);
+}
+
+static int ll_wr_jobidvar(struct file *file, const char *buffer,
+                         unsigned long count, void *data)
+{
+       if (!count || count > JOBSTATS_JOBID_VAR_MAX_LEN)
+               return -EINVAL;
+
+       memset(obd_jobid_var, 0, JOBSTATS_JOBID_VAR_MAX_LEN + 1);
+       /* Trim the trailing '\n' if any */
+       memcpy(obd_jobid_var, buffer, count - (buffer[count - 1] == '\n'));
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -680,6 +698,7 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "statahead_stats",  ll_rd_statahead_stats, 0, 0 },
         { "lazystatfs",       ll_rd_lazystatfs, ll_wr_lazystatfs, 0 },
         { "max_easize",       ll_rd_maxea_size, 0, 0 },
+       { "jobid_var",  ll_rd_jobidvar, ll_wr_jobidvar, 0 },
         { 0 }
 };
 
index f295836..2f83fc2 100644 (file)
@@ -1117,6 +1117,7 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
         result = 0;
         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
                 size_t count;
+               struct ll_inode_info *lli = ll_i2info(ccc_object_inode(obj));
 
                 count = io->u.ci_rw.crw_count;
                 /* "If nbyte is 0, read() will return 0 and have no other
@@ -1127,6 +1128,13 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
                         cio->cui_tot_count = count;
                         cio->cui_tot_nrsegs = 0;
                 }
+               /* for read/write, we store the jobid in the inode, and
+                * it'll be fetched by osc when building RPC.
+                *
+                * it's not accurate if the file is shared by different
+                * jobs.
+                */
+               lustre_get_jobid(lli->lli_jobid);
         } else if (io->ci_type == CIT_SETATTR) {
                 if (!cl_io_is_trunc(io))
                         io->ci_lockreq = CILR_MANDATORY;
index e50b8a4..9d74ff6 100644 (file)
@@ -368,7 +368,7 @@ static int mdt_statfs(struct mdt_thread_info *info)
         }
 
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_STATFS);
+               mdt_counter_incr(req, LPROC_MDT_STATFS);
 
         RETURN(rc);
 }
@@ -669,7 +669,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
 out:
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
+               mdt_counter_incr(req, LPROC_MDT_GETATTR);
 
         RETURN(rc);
 }
@@ -767,7 +767,6 @@ static int mdt_getattr(struct mdt_thread_info *info)
                 mdt_exit_ucred(info);
         EXIT;
 out_shrink:
-
         mdt_client_compatibility(info);
         rc2 = mdt_fix_reply(info);
         if (rc == 0)
@@ -1744,7 +1743,7 @@ static int mdt_sync(struct mdt_thread_info *info)
                         rc = err_serious(rc);
         }
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_SYNC);
+               mdt_counter_incr(req, LPROC_MDT_SYNC);
 
         RETURN(rc);
 }
index a78eb91..cfa9d3d 100644 (file)
@@ -785,14 +785,15 @@ enum {
         LPROC_MDT_CROSSDIR_RENAME,
         LPROC_MDT_LAST,
 };
-void mdt_counter_incr(struct obd_export *exp, int opcode);
+void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
 void mdt_stats_counter_init(struct lprocfs_stats *stats);
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars);
 int mdt_procfs_init(struct mdt_device *mdt, const char *name);
 int mdt_procfs_fini(struct mdt_device *mdt);
 void mdt_rename_counter_tally(struct mdt_thread_info *info,
-                              struct mdt_device *mdt, struct obd_export *exp,
-                              struct mdt_object *src, struct mdt_object *tgt);
+                             struct mdt_device *mdt,
+                             struct ptlrpc_request *req,
+                             struct mdt_object *src, struct mdt_object *tgt);
 
 void mdt_time_start(const struct mdt_thread_info *info);
 void mdt_time_end(const struct mdt_thread_info *info, int idx);
index fbc91c7..8f77ce9 100644 (file)
@@ -184,10 +184,10 @@ static int lproc_mdt_attach_rename_seqstat(struct mdt_device *mdt)
 }
 
 void mdt_rename_counter_tally(struct mdt_thread_info *info,
-                              struct mdt_device *mdt,
-                              struct obd_export *exp,
-                              struct mdt_object *src,
-                              struct mdt_object *tgt)
+                             struct mdt_device *mdt,
+                             struct ptlrpc_request *req,
+                             struct mdt_object *src,
+                             struct mdt_object *tgt)
 {
         struct md_attr *ma = &info->mti_attr;
         struct rename_stats *rstats = &mdt->mdt_rename_stats;
@@ -198,18 +198,19 @@ void mdt_rename_counter_tally(struct mdt_thread_info *info,
         rc = mo_attr_get(info->mti_env, mdt_object_child(src), ma);
         if (rc) {
                 CERROR("%s: "DFID" attr_get, rc = %d\n",
-                       exp->exp_obd->obd_name, PFID(mdt_object_fid(src)), rc);
+                      req->rq_export->exp_obd->obd_name,
+                      PFID(mdt_object_fid(src)), rc);
                 return;
         }
 
         if (src == tgt) {
-                mdt_counter_incr(exp, LPROC_MDT_SAMEDIR_RENAME);
+               mdt_counter_incr(req, LPROC_MDT_SAMEDIR_RENAME);
                 lprocfs_oh_tally_log2(&rstats->hist[RENAME_SAMEDIR_SIZE],
                                       (unsigned int)ma->ma_attr.la_size);
                 return;
         }
 
-        mdt_counter_incr(exp, LPROC_MDT_CROSSDIR_RENAME);
+       mdt_counter_incr(req, LPROC_MDT_CROSSDIR_RENAME);
         lprocfs_oh_tally_log2(&rstats->hist[RENAME_CROSSDIR_SRC_SIZE],
                               (unsigned int)ma->ma_attr.la_size);
 
@@ -218,7 +219,8 @@ void mdt_rename_counter_tally(struct mdt_thread_info *info,
         rc = mo_attr_get(info->mti_env, mdt_object_child(tgt), ma);
         if (rc) {
                 CERROR("%s: "DFID" attr_get, rc = %d\n",
-                       exp->exp_obd->obd_name, PFID(mdt_object_fid(tgt)), rc);
+                      req->rq_export->exp_obd->obd_name,
+                      PFID(mdt_object_fid(tgt)), rc);
                 return;
         }
 
@@ -264,8 +266,12 @@ int mdt_procfs_init(struct mdt_device *mdt, const char *name)
                                    "clear", lprocfs_nid_stats_clear_read,
                                    lprocfs_nid_stats_clear_write, obd, NULL);
         rc = lprocfs_alloc_md_stats(obd, LPROC_MDT_LAST);
-        if (rc == 0)
-                mdt_stats_counter_init(obd->md_stats);
+       if (rc)
+               return rc;
+       mdt_stats_counter_init(obd->md_stats);
+
+       rc = lprocfs_job_stats_init(obd, LPROC_MDT_LAST,
+                                   mdt_stats_counter_init);
 
         rc = lproc_mdt_attach_rename_seqstat(mdt);
         if (rc)
@@ -280,6 +286,8 @@ int mdt_procfs_fini(struct mdt_device *mdt)
         struct lu_device *ld = &mdt->mdt_md_dev.md_lu_dev;
         struct obd_device *obd = ld->ld_obd;
 
+       lprocfs_job_stats_fini(obd);
+
         if (obd->obd_proc_exports_entry) {
                 lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
                 obd->obd_proc_exports_entry = NULL;
@@ -1013,6 +1021,8 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { "instance",                   lprocfs_target_rd_instance,         0 },
         { "ir_factor",                  lprocfs_obd_rd_ir_factor,
                                         lprocfs_obd_wr_ir_factor,           0 },
+       { "job_cleanup_interval",       lprocfs_rd_job_interval,
+                                       lprocfs_wr_job_interval, 0 },
         { 0 }
 };
 
@@ -1027,13 +1037,19 @@ void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars)
     lvars->obd_vars     = lprocfs_mdt_obd_vars;
 }
 
-void mdt_counter_incr(struct obd_export *exp, int opcode)
+void mdt_counter_incr(struct ptlrpc_request *req, int opcode)
 {
-        if (exp->exp_obd && exp->exp_obd->md_stats)
-                lprocfs_counter_incr(exp->exp_obd->md_stats, opcode);
-        if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
-                lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
-
+       struct obd_export *exp = req->rq_export;
+
+       if (exp->exp_obd && exp->exp_obd->md_stats)
+               lprocfs_counter_incr(exp->exp_obd->md_stats, opcode);
+       if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
+               lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
+       if (exp->exp_obd && exp->exp_obd->u.obt.obt_jobstats.ojs_hash &&
+           (exp->exp_connect_flags & OBD_CONNECT_JOBSTATS))
+               lprocfs_job_stats_log(exp->exp_obd,
+                                     lustre_msg_get_jobid(req->rq_reqmsg),
+                                     opcode, 1);
 }
 
 void mdt_stats_counter_init(struct lprocfs_stats *stats)
index 16fbdd9..2af48b3 100644 (file)
@@ -1245,7 +1245,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
                                (obd_timeout + 1) / 4);
 
-        mdt_counter_incr(req->rq_export, LPROC_MDT_OPEN);
+       mdt_counter_incr(req, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
@@ -1610,7 +1610,7 @@ int mdt_close(struct mdt_thread_info *info)
         int rc, ret = 0;
         ENTRY;
 
-        mdt_counter_incr(req->rq_export, LPROC_MDT_CLOSE);
+       mdt_counter_incr(req, LPROC_MDT_CLOSE);
         /* Close may come with the Size-on-MDS update. Unpack it. */
         rc = mdt_close_unpack(info);
         if (rc)
index 3470843..222487f 100644 (file)
@@ -595,7 +595,7 @@ out_put:
         mdt_object_put(info->mti_env, mo);
 out:
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR);
+               mdt_counter_incr(req, LPROC_MDT_SETATTR);
 
         mdt_client_compatibility(info);
         rc2 = mdt_fix_reply(info);
@@ -625,7 +625,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
                         rc = mdt_md_mkobj(info);
                 } else {
                         LASSERT(info->mti_rr.rr_namelen > 0);
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_MKDIR);
+                       mdt_counter_incr(req, LPROC_MDT_MKDIR);
                         rc = mdt_md_create(info);
                 }
                 break;
@@ -638,7 +638,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         case S_IFSOCK:{
                 /* Special file should stay on the same node as parent. */
                 LASSERT(info->mti_rr.rr_namelen > 0);
-                mdt_counter_incr(req->rq_export, LPROC_MDT_MKNOD);
+               mdt_counter_incr(req, LPROC_MDT_MKNOD);
                 rc = mdt_md_create(info);
                 break;
         }
@@ -770,7 +770,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (ma->ma_valid & MA_INODE) {
                 switch (ma->ma_attr.la_mode & S_IFMT) {
                 case S_IFDIR:
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_RMDIR);
+                       mdt_counter_incr(req, LPROC_MDT_RMDIR);
                         break;
                 case S_IFREG:
                 case S_IFLNK:
@@ -778,7 +778,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 case S_IFBLK:
                 case S_IFIFO:
                 case S_IFSOCK:
-                        mdt_counter_incr(req->rq_export, LPROC_MDT_UNLINK);
+                       mdt_counter_incr(req, LPROC_MDT_UNLINK);
                         break;
                 default:
                         LASSERTF(0, "bad file type %o unlinking\n",
@@ -897,7 +897,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                       mdt_object_child(ms), lname, ma);
 
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_LINK);
+               mdt_counter_incr(req, LPROC_MDT_LINK);
 
         EXIT;
 out_unlock_child:
@@ -1292,11 +1292,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         /* handle last link of tgt object */
         if (rc == 0) {
-                mdt_counter_incr(req->rq_export, LPROC_MDT_RENAME);
+               mdt_counter_incr(req, LPROC_MDT_RENAME);
                 if (mnew)
                         mdt_handle_last_unlink(info, mnew, ma);
 
-                mdt_rename_counter_tally(info, info->mti_mdt, req->rq_export,
+               mdt_rename_counter_tally(info, info->mti_mdt, req,
                                          msrcdir, mtgtdir);
         }
 
index 1a36e30..62e2622 100644 (file)
@@ -213,7 +213,7 @@ int mdt_getxattr(struct mdt_thread_info *info)
         EXIT;
 out:
         if (rc >= 0) {
-                mdt_counter_incr(req->rq_export, LPROC_MDT_GETXATTR);
+               mdt_counter_incr(req, LPROC_MDT_GETXATTR);
                 repbody->eadatasize = rc;
                 rc = 0;
         }
@@ -417,7 +417,7 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
                 rc = -EINVAL;
         }
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_SETXATTR);
+               mdt_counter_incr(req, LPROC_MDT_SETXATTR);
 
         EXIT;
 out_unlock:
index 2865539..3a6944c 100644 (file)
@@ -9,7 +9,7 @@ sources:
 
 obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o
 obdclass-all-objs += class_obd.o debug.o genops.o uuid.o llog_ioctl.o
-obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o
+obdclass-all-objs += lprocfs_status.o lprocfs_jobstats.o lustre_handles.o lustre_peer.o
 obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o
 obdclass-all-objs += lu_object.o dt_object.o capa.o lu_time.o
 obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o
index 9e15f8c..f7cc4d1 100644 (file)
@@ -7,12 +7,12 @@ DIST_SUBDIRS := linux darwin
 if LIBLUSTRE
 INCLUDES = -I$(SYSIO)/include
 noinst_LIBRARIES = liblustreclass.a
-liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c 
+liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c
 liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c
 liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c 
 liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c capa.c
 liblustreclass_a_SOURCES += lu_object.c cl_object.c lu_time.c lu_ref.c
-liblustreclass_a_SOURCES += cl_page.c cl_lock.c cl_io.c
+liblustreclass_a_SOURCES += cl_page.c cl_lock.c cl_io.c lprocfs_jobstats.c
 liblustreclass_a_SOURCES += #llog_ioctl.c rbtree.c
 liblustreclass_a_CPPFLAGS = $(LLCPPFLAGS)
 liblustreclass_a_CFLAGS = $(LLCFLAGS)
index e20fd67..f075104 100644 (file)
@@ -86,6 +86,55 @@ int at_extra = 30;
 cfs_atomic_t obd_dirty_pages;
 cfs_atomic_t obd_dirty_transit_pages;
 
+char obd_jobid_var[JOBSTATS_JOBID_VAR_MAX_LEN + 1] = JOBSTATS_DISABLE;
+EXPORT_SYMBOL(obd_jobid_var);
+
+/* Get jobid of current process by reading the environment variable
+ * stored in between the "env_start" & "env_end" of task struct.
+ *
+ * TODO:
+ * It's better to cache the jobid for later use if there is any
+ * efficient way, the cl_env code probably could be reused for this
+ * purpose.
+ *
+ * If some job scheduler doesn't store jobid in the "env_start/end",
+ * then an upcall could be issued here to get the jobid by utilizing
+ * the userspace tools/api. Then, the jobid must be cached.
+ */
+int lustre_get_jobid(char *jobid)
+{
+#ifdef __KERNEL__
+       int jobid_len = JOBSTATS_JOBID_SIZE;
+#endif
+       int ret = 0;
+       ENTRY;
+
+       memset(jobid, 0, JOBSTATS_JOBID_SIZE);
+       /* Jobstats isn't enabled */
+       if (!memcmp(obd_jobid_var, JOBSTATS_DISABLE,
+                   strlen(JOBSTATS_DISABLE))) {
+               RETURN(0);
+       }
+
+       /* Use process name + fsuid as jobid */
+       if (!memcmp(obd_jobid_var, JOBSTATS_PROCNAME_UID,
+                   strlen(JOBSTATS_PROCNAME_UID))) {
+               snprintf(jobid, JOBSTATS_JOBID_SIZE, "%s_%u",
+                        cfs_curproc_comm(), cfs_curproc_fsuid());
+               RETURN(0);
+       }
+
+#ifdef __KERNEL__
+       ret = cfs_get_environ(obd_jobid_var, jobid, &jobid_len);
+       if (ret) {
+               CDEBUG((ret != -ENOENT && ret != -EINVAL) ? D_ERROR : D_INFO,
+                      "Get jobid for (%s) failed(%d).\n", obd_jobid_var, ret);
+       }
+#endif
+       RETURN(ret);
+}
+EXPORT_SYMBOL(lustre_get_jobid);
+
 static inline void obd_data2conn(struct lustre_handle *conn,
                                  struct obd_ioctl_data *data)
 {
@@ -365,7 +414,6 @@ EXPORT_SYMBOL(at_extra);
 EXPORT_SYMBOL(at_early_margin);
 EXPORT_SYMBOL(at_history);
 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
-
 EXPORT_SYMBOL(proc_lustre_root);
 
 /* uuid.c */
diff --git a/lustre/obdclass/lprocfs_jobstats.c b/lustre/obdclass/lprocfs_jobstats.c
new file mode 100644 (file)
index 0000000..0072feb
--- /dev/null
@@ -0,0 +1,571 @@
+/* GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Niu Yawei <niu@whamcloud.com>
+ */
+/*
+ * lustre/obdclass/lprocfs_jobstats.c
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#ifndef __KERNEL__
+# include <liblustre.h>
+#endif
+
+#include <obd_class.h>
+#include <lprocfs_status.h>
+#include <lustre/lustre_idl.h>
+
+#if defined(LPROCFS)
+
+/*
+ * JobID formats & JobID environment variable names for supported
+ * job schedulers:
+ *
+ * SLURM:
+ *   JobID format:  32 bit integer.
+ *   JobID env var: SLURM_JOB_ID.
+ * SGE:
+ *   JobID format:  Decimal integer range to 99999.
+ *   JobID env var: JOB_ID.
+ * LSF:
+ *   JobID format:  6 digit integer by default (up to 999999), can be
+ *               increased to 10 digit (up to 2147483646).
+ *   JobID env var: LSB_JOBID.
+ * Loadleveler:
+ *   JobID format:  String of machine_name.cluster_id.process_id, for
+ *               example: fr2n02.32.0
+ *   JobID env var: LOADL_STEP_ID.
+ * PBS:
+ *   JobID format:  String of sequence_number[.server_name][@server].
+ *   JobID env var: PBS_JOBID.
+ * Maui/MOAB:
+ *   JobID format:  Same as PBS.
+ *   JobID env var: Same as PBS.
+ */
+
+struct job_stat {
+       cfs_hlist_node_t      js_hash;
+       cfs_list_t            js_list;
+       cfs_atomic_t          js_refcount;
+       char                  js_jobid[JOBSTATS_JOBID_SIZE];
+       time_t                js_timestamp; /* seconds */
+       struct lprocfs_stats *js_stats;
+       struct obd_job_stats *js_jobstats;
+};
+
+static unsigned job_stat_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+{
+       return cfs_hash_djb2_hash(key, strlen(key), mask);
+}
+
+static void *job_stat_key(cfs_hlist_node_t *hnode)
+{
+       struct job_stat *job;
+       job = cfs_hlist_entry(hnode, struct job_stat, js_hash);
+       return job->js_jobid;
+}
+
+static int job_stat_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+       struct job_stat *job;
+       job = cfs_hlist_entry(hnode, struct job_stat, js_hash);
+       return (strlen(job->js_jobid) == strlen(key)) &&
+              !strncmp(job->js_jobid, key, strlen(key));
+}
+
+static void *job_stat_object(cfs_hlist_node_t *hnode)
+{
+       return cfs_hlist_entry(hnode, struct job_stat, js_hash);
+}
+
+static void job_stat_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct job_stat *job;
+       job = cfs_hlist_entry(hnode, struct job_stat, js_hash);
+       cfs_atomic_inc(&job->js_refcount);
+}
+
+static void job_free(struct job_stat *job)
+{
+       LASSERT(atomic_read(&job->js_refcount) == 0);
+       LASSERT(job->js_jobstats);
+
+       cfs_write_lock(&job->js_jobstats->ojs_lock);
+       cfs_list_del_init(&job->js_list);
+       cfs_write_unlock(&job->js_jobstats->ojs_lock);
+
+       lprocfs_free_stats(&job->js_stats);
+       OBD_FREE_PTR(job);
+}
+
+static void job_putref(struct job_stat *job)
+{
+       LASSERT(atomic_read(&job->js_refcount) > 0);
+       if (atomic_dec_and_test(&job->js_refcount))
+               job_free(job);
+}
+
+static void job_stat_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct job_stat *job;
+       job = cfs_hlist_entry(hnode, struct job_stat, js_hash);
+       job_putref(job);
+}
+
+static void job_stat_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       CERROR("Should not have any items!");
+}
+
+static cfs_hash_ops_t job_stats_hash_ops = {
+       .hs_hash       = job_stat_hash,
+       .hs_key        = job_stat_key,
+       .hs_keycmp     = job_stat_keycmp,
+       .hs_object     = job_stat_object,
+       .hs_get        = job_stat_get,
+       .hs_put_locked = job_stat_put_locked,
+       .hs_exit       = job_stat_exit,
+};
+
+static struct job_stat *job_alloc(char *jobid, struct obd_job_stats *jobs)
+{
+       struct job_stat *job;
+
+       LASSERT(jobs->ojs_cntr_num && jobs->ojs_cntr_init_fn);
+
+       OBD_ALLOC_PTR(job);
+       if (job == NULL)
+               return NULL;
+
+       job->js_stats = lprocfs_alloc_stats(jobs->ojs_cntr_num, 0);
+       if (job->js_stats == NULL) {
+               OBD_FREE_PTR(job);
+               return NULL;
+       }
+
+       jobs->ojs_cntr_init_fn(job->js_stats);
+
+       memcpy(job->js_jobid, jobid, JOBSTATS_JOBID_SIZE);
+       job->js_timestamp = cfs_time_current_sec();
+       job->js_jobstats = jobs;
+       CFS_INIT_HLIST_NODE(&job->js_hash);
+       CFS_INIT_LIST_HEAD(&job->js_list);
+       cfs_atomic_set(&job->js_refcount, 1);
+
+       return job;
+}
+
+int lprocfs_job_stats_log(struct obd_device *obd, char *jobid,
+                         int event, long amount)
+{
+       struct obd_job_stats *stats = &obd->u.obt.obt_jobstats;
+       struct job_stat *job, *job2;
+       ENTRY;
+
+       LASSERT(stats && stats->ojs_hash);
+
+       if (!jobid || !strlen(jobid))
+               RETURN(-EINVAL);
+
+       if (strlen(jobid) >= JOBSTATS_JOBID_SIZE) {
+               CERROR("Invalid jobid size (%lu), expect(%d)\n",
+                      (unsigned long)strlen(jobid) + 1, JOBSTATS_JOBID_SIZE);
+               RETURN(-EINVAL);
+       }
+
+       job = cfs_hash_lookup(stats->ojs_hash, jobid);
+       if (job)
+               goto found;
+
+       job = job_alloc(jobid, stats);
+       if (job == NULL)
+               RETURN(-ENOMEM);
+
+       job2 = cfs_hash_findadd_unique(stats->ojs_hash, job->js_jobid,
+                                      &job->js_hash);
+       if (job2 != job) {
+               job_putref(job);
+               job = job2;
+               LASSERT(!cfs_list_empty(&job->js_list));
+       } else {
+               LASSERT(cfs_list_empty(&job->js_list));
+               cfs_write_lock(&stats->ojs_lock);
+               cfs_list_add_tail(&job->js_list, &stats->ojs_list);
+               cfs_write_unlock(&stats->ojs_lock);
+       }
+
+found:
+       LASSERT(stats == job->js_jobstats);
+       LASSERT(stats->ojs_cntr_num > event);
+       job->js_timestamp = cfs_time_current_sec();
+       lprocfs_counter_add(job->js_stats, event, amount);
+
+       job_putref(job);
+       RETURN(0);
+}
+EXPORT_SYMBOL(lprocfs_job_stats_log);
+
+static int job_iter_callback(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                            cfs_hlist_node_t *hnode, void *data)
+{
+       time_t oldest = *((time_t *)data);
+       struct job_stat *job;
+
+       job = cfs_hlist_entry(hnode, struct job_stat, js_hash);
+       if (!oldest || job->js_timestamp < oldest)
+               cfs_hash_bd_del_locked(hs, bd, hnode);
+
+       return 0;
+}
+
+void lprocfs_job_stats_fini(struct obd_device *obd)
+{
+       struct obd_job_stats *stats = &obd->u.obt.obt_jobstats;
+       time_t oldest = 0;
+
+       if (stats->ojs_hash == NULL)
+               return;
+       cfs_timer_disarm(&stats->ojs_cleanup_timer);
+       cfs_hash_for_each_safe(stats->ojs_hash, job_iter_callback, &oldest);
+       cfs_hash_putref(stats->ojs_hash);
+       stats->ojs_hash = NULL;
+       LASSERT(cfs_list_empty(&stats->ojs_list));
+}
+EXPORT_SYMBOL(lprocfs_job_stats_fini);
+
+static void *lprocfs_jobstats_seq_start(struct seq_file *p, loff_t *pos)
+{
+       struct obd_job_stats *stats = p->private;
+       loff_t off = *pos;
+       struct job_stat *job;
+
+       cfs_read_lock(&stats->ojs_lock);
+       if (off == 0)
+               return SEQ_START_TOKEN;
+       off--;
+       cfs_list_for_each_entry(job, &stats->ojs_list, js_list) {
+               if (!off--)
+                       return job;
+       }
+       return NULL;
+}
+
+static void lprocfs_jobstats_seq_stop(struct seq_file *p, void *v)
+{
+       struct obd_job_stats *stats = p->private;
+
+       cfs_read_unlock(&stats->ojs_lock);
+}
+
+static void *lprocfs_jobstats_seq_next(struct seq_file *p, void *v, loff_t *pos)
+{
+       struct obd_job_stats *stats = p->private;
+       struct job_stat *job;
+       cfs_list_t *next;
+
+       ++*pos;
+       if (v == SEQ_START_TOKEN) {
+               next = stats->ojs_list.next;
+       } else {
+               job = (struct job_stat *)v;
+               next = job->js_list.next;
+       }
+
+       return next == &stats->ojs_list ? NULL :
+               cfs_list_entry(next, struct job_stat, js_list);
+}
+
+/*
+ * Example of output on MDT:
+ *
+ * job_stats:
+ * - job_id:        test_id.222.25844
+ *   snapshot_time: 1322494486
+ *   open:          { samples:        3, unit: reqs }
+ *   close:         { samples:        3, unit: reqs }
+ *   mknod:         { samples:        0, unit: reqs }
+ *   link:          { samples:        0, unit: reqs }
+ *   unlink:        { samples:        0, unit: reqs }
+ *   mkdir:         { samples:        0, unit: reqs }
+ *   rmdir:         { samples:        0, unit: reqs }
+ *   rename:        { samples:        1, unit: reqs }
+ *   getattr:       { samples:        7, unit: reqs }
+ *   setattr:       { samples:        0, unit: reqs }
+ *   getxattr:      { samples:        0, unit: reqs }
+ *   setxattr:      { samples:        0, unit: reqs }
+ *   statfs:        { samples:        0, unit: reqs }
+ *   sync:          { samples:        0, unit: reqs }
+ *
+ * Example of output on OST:
+ *
+ * job_stats:
+ * - job_id         4854
+ *   snapshot_time: 1322494602
+ *   read:          { samples:  0, unit: bytes, min:  0, max:  0, sum:  0 }
+ *   write:         { samples:  1, unit: bytes, min: 10, max: 10, sum: 10 }
+ *   setattr:       { samples:  0, unit: reqs }
+ *   punch:         { samples:  0, unit: reqs }
+ *   sync:          { samples:  0, unit: reqs }
+ */
+
+static const char spaces[] = "                    ";
+
+static int inline width(const char *str, int len)
+{
+       return len - min((int)strlen(str), 15);
+}
+
+static int lprocfs_jobstats_seq_show(struct seq_file *p, void *v)
+{
+       struct job_stat *job = v;
+       struct lprocfs_stats *s;
+       struct lprocfs_counter ret, *cntr;
+       int i;
+
+       if (v == SEQ_START_TOKEN) {
+               seq_printf(p, "job_stats:\n");
+               return 0;
+       }
+
+       seq_printf(p, "- %-16s %s\n", "job_id:", job->js_jobid);
+       seq_printf(p, "  %-16s %ld\n", "snapshot_time:", job->js_timestamp);
+
+       s = job->js_stats;
+       for (i = 0; i < s->ls_num; i++) {
+               cntr = &(s->ls_percpu[0]->lp_cntr[i]);
+               lprocfs_stats_collect(s, i, &ret);
+
+               seq_printf(p, "  %s:%.*s { samples: %11"LPF64"u",
+                          cntr->lc_name, width(cntr->lc_name, 15), spaces,
+                          ret.lc_count);
+               if (cntr->lc_units[0] != '\0')
+                       seq_printf(p, ", unit: %5s", cntr->lc_units);
+
+               if (cntr->lc_config & LPROCFS_CNTR_AVGMINMAX) {
+                       seq_printf(p, ", min:%8"LPF64"u, max:%8"LPF64"u,"
+                                  " sum:%16"LPF64"u",
+                                  ret.lc_count ? ret.lc_min : 0,
+                                  ret.lc_count ? ret.lc_max : 0,
+                                  ret.lc_count ? ret.lc_sum : 0);
+               }
+               if (cntr->lc_config & LPROCFS_CNTR_STDDEV) {
+                       seq_printf(p, ", sumsq: %18"LPF64"u",
+                                  ret.lc_count ? ret.lc_sumsquare : 0);
+               }
+
+               seq_printf(p, " }\n");
+
+       }
+       return 0;
+}
+
+struct seq_operations lprocfs_jobstats_seq_sops = {
+       start: lprocfs_jobstats_seq_start,
+       stop:  lprocfs_jobstats_seq_stop,
+       next:  lprocfs_jobstats_seq_next,
+       show:  lprocfs_jobstats_seq_show,
+};
+
+static int lprocfs_jobstats_seq_open(struct inode *inode, struct file *file)
+{
+       struct proc_dir_entry *dp = PDE(inode);
+       struct seq_file *seq;
+       int rc;
+
+       if (LPROCFS_ENTRY_AND_CHECK(dp))
+               return -ENOENT;
+
+       rc = seq_open(file, &lprocfs_jobstats_seq_sops);
+       if (rc) {
+               LPROCFS_EXIT();
+               return rc;
+       }
+       seq = file->private_data;
+       seq->private = dp->data;
+       return 0;
+}
+
+static ssize_t lprocfs_jobstats_seq_write(struct file *file, const char *buf,
+                                         size_t len, loff_t *off)
+{
+       struct seq_file *seq = file->private_data;
+       struct obd_job_stats *stats = seq->private;
+       char jobid[JOBSTATS_JOBID_SIZE];
+       int all = 0;
+       struct job_stat *job;
+
+       if (!memcmp(buf, "clear", strlen("clear"))) {
+               all = 1;
+       } else if (len < JOBSTATS_JOBID_SIZE) {
+               memset(jobid, 0, JOBSTATS_JOBID_SIZE);
+               /* Trim '\n' if any */
+               if (buf[len - 1] == '\n')
+                       memcpy(jobid, buf, len - 1);
+               else
+                       memcpy(jobid, buf, len);
+       } else {
+               return -EINVAL;
+       }
+
+       LASSERT(stats->ojs_hash);
+       if (all) {
+               time_t oldest = 0;
+               cfs_hash_for_each_safe(stats->ojs_hash, job_iter_callback,
+                                      &oldest);
+               return len;
+       }
+
+       if (!strlen(jobid))
+               return -EINVAL;
+
+       job = cfs_hash_lookup(stats->ojs_hash, jobid);
+       if (!job)
+               return -EINVAL;
+
+       cfs_hash_del_key(stats->ojs_hash, jobid);
+
+       job_putref(job);
+       return len;
+}
+
+struct file_operations lprocfs_jobstats_seq_fops = {
+       .owner   = THIS_MODULE,
+       .open    = lprocfs_jobstats_seq_open,
+       .read    = seq_read,
+       .write   = lprocfs_jobstats_seq_write,
+       .llseek  = seq_lseek,
+       .release = lprocfs_seq_release,
+};
+
+static void job_cleanup_callback(unsigned long data)
+{
+       struct obd_job_stats *stats = (struct obd_job_stats *)data;
+       time_t oldest;
+
+       if (stats->ojs_cleanup_interval) {
+               oldest = cfs_time_current_sec() - stats->ojs_cleanup_interval;
+               cfs_hash_for_each_safe(stats->ojs_hash, job_iter_callback,
+                                      &oldest);
+               cfs_timer_arm(&stats->ojs_cleanup_timer,
+                             cfs_time_shift(stats->ojs_cleanup_interval));
+       }
+}
+
+int lprocfs_job_stats_init(struct obd_device *obd, int cntr_num,
+                          cntr_init_callback init_fn)
+{
+       struct proc_dir_entry *entry;
+       struct obd_job_stats *stats;
+       ENTRY;
+
+       LASSERT(obd->obd_proc_entry != NULL);
+       LASSERT(obd->obd_type->typ_name);
+
+       if (strcmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME) &&
+           strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME)) {
+               CERROR("Invalid obd device type.\n");
+               RETURN(-EINVAL);
+       }
+       stats = &obd->u.obt.obt_jobstats;
+
+       LASSERT(stats->ojs_hash == NULL);
+       stats->ojs_hash = cfs_hash_create("JOB_STATS",
+                                         HASH_JOB_STATS_CUR_BITS,
+                                         HASH_JOB_STATS_MAX_BITS,
+                                         HASH_JOB_STATS_BKT_BITS, 0,
+                                         CFS_HASH_MIN_THETA,
+                                         CFS_HASH_MAX_THETA,
+                                         &job_stats_hash_ops,
+                                         CFS_HASH_DEFAULT);
+       if (stats->ojs_hash == NULL)
+               RETURN(-ENOMEM);
+
+       CFS_INIT_LIST_HEAD(&stats->ojs_list);
+       cfs_rwlock_init(&stats->ojs_lock);
+       stats->ojs_cntr_num = cntr_num;
+       stats->ojs_cntr_init_fn = init_fn;
+       cfs_timer_init(&stats->ojs_cleanup_timer, job_cleanup_callback, stats);
+       stats->ojs_cleanup_interval = 600; /* 10 mins by default */
+       cfs_timer_arm(&stats->ojs_cleanup_timer,
+                     cfs_time_shift(stats->ojs_cleanup_interval));
+
+       LPROCFS_WRITE_ENTRY();
+       entry = create_proc_entry("job_stats", 0644, obd->obd_proc_entry);
+       LPROCFS_WRITE_EXIT();
+       if (entry) {
+               entry->proc_fops = &lprocfs_jobstats_seq_fops;
+               entry->data = stats;
+               RETURN(0);
+       } else {
+               lprocfs_job_stats_fini(obd);
+               RETURN(-ENOMEM);
+       }
+}
+EXPORT_SYMBOL(lprocfs_job_stats_init);
+
+int lprocfs_rd_job_interval(char *page, char **start, off_t off,
+                           int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct obd_job_stats *stats;
+
+       LASSERT(obd != NULL);
+       stats = &obd->u.obt.obt_jobstats;
+       *eof = 1;
+       return snprintf(page, count, "%d\n", stats->ojs_cleanup_interval);
+}
+EXPORT_SYMBOL(lprocfs_rd_job_interval);
+
+int lprocfs_wr_job_interval(struct file *file, const char *buffer,
+                           unsigned long count, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct obd_job_stats *stats;
+       int val, rc;
+
+       LASSERT(obd != NULL);
+       stats = &obd->u.obt.obt_jobstats;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       stats->ojs_cleanup_interval = val;
+       if (!stats->ojs_cleanup_interval)
+               cfs_timer_disarm(&stats->ojs_cleanup_timer);
+       else
+               cfs_timer_arm(&stats->ojs_cleanup_timer,
+                             cfs_time_shift(stats->ojs_cleanup_interval));
+
+       return count;
+
+}
+EXPORT_SYMBOL(lprocfs_wr_job_interval);
+
+#endif /* LPROCFS*/
index 97af8b5..71aae20 100644 (file)
@@ -2224,10 +2224,15 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                 GOTO(free_obd_stats, rc);
         }
 
+       rc = lprocfs_job_stats_init(obd, LPROC_FILTER_STATS_LAST,
+                                   filter_stats_counter_init);
+       if (rc)
+               GOTO(remove_entry_clear, rc);
+
         /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
         OBD_PAGE_ALLOC(page, CFS_ALLOC_STD);
         if (!page)
-                GOTO(remove_entry_clear, rc = -ENOMEM);
+               GOTO(job_stats_fini, rc = -ENOMEM);
         addr = (unsigned long)cfs_page_address(page);
         clear_page((void *)addr);
         memcpy((void *)addr, lustre_cfg_buf(lcfg, 4),
@@ -2237,11 +2242,13 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         if (rc) {
                 CERROR("%s: filter_common_setup failed: %d.\n",
                        obd->obd_name, rc);
-                GOTO(remove_entry_clear, rc);
+               GOTO(job_stats_fini, rc);
         }
 
         RETURN(0);
 
+job_stats_fini:
+       lprocfs_job_stats_fini(obd);
 remove_entry_clear:
         lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
 free_obd_stats:
@@ -2618,6 +2625,7 @@ static int filter_precleanup(struct obd_device *obd,
                 obd_zombie_barrier();
 
                 rc = filter_llog_preclean(obd);
+               lprocfs_job_stats_fini(obd);
                 lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
                 lprocfs_free_per_client_stats(obd);
                 lprocfs_obd_cleanup(obd);
@@ -3513,6 +3521,8 @@ int filter_setattr(const struct lu_env *env, struct obd_export *exp,
         obdo_from_inode(oa, dentry->d_inode,
                         FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID);
 
+       filter_counter_incr(exp, LPROC_FILTER_STATS_SETATTR,
+                           oti ? oti->oti_jobid : NULL, 1);
         EXIT;
 out_unlock:
         f_dput(dentry);
@@ -4412,6 +4422,7 @@ static int filter_sync(const struct lu_env *env, struct obd_export *exp,
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
+       filter_counter_incr(exp, LPROC_FILTER_STATS_SYNC, oinfo->oi_jobid, 1);
         f_dput(dentry);
         RETURN(rc);
 }
index 27c41a0..67f40a7 100644 (file)
@@ -113,6 +113,24 @@ enum {
         LPROC_FILTER_LAST,
 };
 
+/* for job stats */
+enum {
+       LPROC_FILTER_STATS_READ = 0,
+       LPROC_FILTER_STATS_WRITE = 1,
+       LPROC_FILTER_STATS_SETATTR = 2,
+       LPROC_FILTER_STATS_PUNCH = 3,
+       LPROC_FILTER_STATS_SYNC = 4,
+       LPROC_FILTER_STATS_LAST,
+};
+
+static inline void filter_counter_incr(struct obd_export *exp, int opcode,
+                                      char *jobid, long amount)
+{
+       if (exp->exp_obd && exp->exp_obd->u.obt.obt_jobstats.ojs_hash &&
+           (exp->exp_connect_flags & OBD_CONNECT_JOBSTATS))
+               lprocfs_job_stats_log(exp->exp_obd, jobid, opcode, amount);
+}
+
 //#define FILTER_MAX_CACHE_SIZE (32 * 1024 * 1024) /* was OBD_OBJECT_EOF */
 #define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF
 
@@ -220,6 +238,7 @@ void filter_tally(struct obd_export *exp, struct page **pages, int nr_pages,
                   unsigned long *blocks, int blocks_per_page, int wr);
 int lproc_filter_attach_seqstat(struct obd_device *dev);
 void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars);
+void filter_stats_counter_init(struct lprocfs_stats *stats);
 #else
 static inline void filter_tally(struct obd_export *exp, struct page **pages,
                                 int nr_pages, unsigned long *blocks,
@@ -229,6 +248,7 @@ static void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars)
 {
         memset(lvars, 0, sizeof(*lvars));
 }
+static inline void filter_stats_counter_init(struct lprocfs_stats *stats) {}
 #endif
 
 /* Quota stuff */
index d935942..29a0f73 100644 (file)
@@ -472,7 +472,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
                 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
                                     LPROC_FILTER_READ_BYTES, tot_bytes);
-
+       filter_counter_incr(exp, LPROC_FILTER_STATS_READ,
+                           oti ? oti->oti_jobid : NULL, tot_bytes);
         EXIT;
 
  cleanup:
@@ -864,6 +865,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
                 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
                                     LPROC_FILTER_WRITE_BYTES, tot_bytes);
+       filter_counter_incr(exp, LPROC_FILTER_STATS_WRITE,
+                           oti ? oti->oti_jobid : NULL, tot_bytes);
         EXIT;
 cleanup:
         switch(cleanup_phase) {
index 132aeb3..12871fe 100644 (file)
@@ -480,6 +480,8 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "instance",     lprocfs_target_rd_instance, 0 },
         { "ir_factor",    lprocfs_obd_rd_ir_factor,
                           lprocfs_obd_wr_ir_factor, 0},
+       { "job_cleanup_interval", lprocfs_rd_job_interval,
+                                 lprocfs_wr_job_interval, 0},
         { 0 }
 };
 
@@ -682,4 +684,20 @@ static ssize_t filter_per_nid_stats_seq_write(struct file *file,
 }
 
 LPROC_SEQ_FOPS(filter_per_nid_stats);
+
+void filter_stats_counter_init(struct lprocfs_stats *stats)
+{
+       LASSERT(stats && stats->ls_num == LPROC_FILTER_STATS_LAST);
+       lprocfs_counter_init(stats, LPROC_FILTER_STATS_READ,
+                            LPROCFS_CNTR_AVGMINMAX, "read", "bytes");
+       lprocfs_counter_init(stats, LPROC_FILTER_STATS_WRITE,
+                            LPROCFS_CNTR_AVGMINMAX, "write", "bytes");
+       lprocfs_counter_init(stats, LPROC_FILTER_STATS_SETATTR,
+                            0, "setattr", "reqs");
+       lprocfs_counter_init(stats, LPROC_FILTER_STATS_PUNCH,
+                            0, "punch", "reqs");
+       lprocfs_counter_init(stats, LPROC_FILTER_STATS_SYNC,
+                            0, "sync", "reqs");
+}
+
 #endif /* LPROCFS */
index 1efb9a2..f1bb130 100644 (file)
@@ -543,7 +543,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
 
         ptlrpc_request_set_replen(req);
 
-
         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
         sa = ptlrpc_req_async_args(req);
@@ -2379,6 +2378,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         LASSERT(ops != NULL);
         crattr.cra_oa = oa;
         crattr.cra_capa = NULL;
+       memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
         if (lock) {
                 oa->o_handle = lock->l_remote_handle;
@@ -2410,6 +2410,8 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         cl_req_attr_set(env, clerq, &crattr,
                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
 
+       lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
+
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
index 547dc8c..30cbf17 100644 (file)
@@ -428,7 +428,8 @@ unlock:
         RETURN(rc);
 }
 
-static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
+static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
+                   struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
         struct obd_info *oinfo;
@@ -465,6 +466,7 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
 
         oinfo->oi_oa = &repbody->oa;
         oinfo->oi_capa = capa;
+       oinfo->oi_jobid = oti->oti_jobid;
         req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
                                   repbody->oa.o_size, repbody->oa.o_blocks,
                                   NULL);
@@ -2119,6 +2121,10 @@ int ost_handle(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
+       if (req && req->rq_reqmsg && req->rq_export &&
+           (req->rq_export->exp_connect_flags & OBD_CONNECT_JOBSTATS))
+               oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
+
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case OST_CONNECT: {
                 CDEBUG(D_INODE, "connect\n");
@@ -2233,7 +2239,7 @@ int ost_handle(struct ptlrpc_request *req)
                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
                         RETURN(0);
-                rc = ost_sync(req->rq_export, req);
+               rc = ost_sync(req->rq_export, req, oti);
                 break;
         case OST_SET_INFO:
                 DEBUG_REQ(D_INODE, req, "set_info");
index 0f159e5..2995e6a 100644 (file)
@@ -606,7 +606,30 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
 int ptlrpc_request_pack(struct ptlrpc_request *request,
                         __u32 version, int opcode)
 {
-        return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
+       int rc;
+       rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
+       if (rc)
+               return rc;
+
+       /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
+        * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
+        * have to send old ptlrpc_body to keep interoprability with these
+        * clients.
+        *
+        * Only three kinds of server->client RPCs so far:
+        *  - LDLM_BL_CALLBACK
+        *  - LDLM_CP_CALLBACK
+        *  - LDLM_GL_CALLBACK
+        *
+        * XXX This should be removed whenever we drop the interoprability with
+        *     the these old clients.
+        */
+       if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
+           opcode == LDLM_GL_CALLBACK)
+               req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
+                                  sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
+
+       return rc;
 }
 
 /**
@@ -951,6 +974,7 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
+       char jobid[JOBSTATS_JOBID_SIZE];
         LASSERT(cfs_list_empty(&req->rq_set_chain));
 
         /* The set takes over the caller's request reference */
@@ -958,6 +982,11 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
         req->rq_set = set;
         cfs_atomic_inc(&set->set_remaining);
         req->rq_queued_time = cfs_time_current();
+
+       if (req->rq_reqmsg) {
+               lustre_get_jobid(jobid);
+               lustre_msg_set_jobid(req->rq_reqmsg, jobid);
+       }
 }
 
 /**
index c94f5e0..c65ae1c 100644 (file)
@@ -476,6 +476,16 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
                        req->rq_export->exp_obd->obd_minor);
         }
 
+       /* In order to keep interoprability with the client (< 2.3) which
+        * doesn't have pb_jobid in ptlrpc_body, We have to shrink the
+        * ptlrpc_body in reply buffer to ptlrpc_body_v2, otherwise, the
+        * reply buffer on client will be overflow.
+        *
+        * XXX Remove this whenver we drop the interoprability with such client.
+        */
+       req->rq_replen = lustre_shrink_msg(req->rq_repmsg, 0,
+                                          sizeof(struct ptlrpc_body_v2), 1);
+
         if (req->rq_type != PTL_RPC_MSG_ERR)
                 req->rq_type = PTL_RPC_MSG_REPLY;
 
index 14ee58c..545d767 100644 (file)
@@ -118,8 +118,17 @@ int lustre_msg_check_version(struct lustre_msg *msg, __u32 version)
 int lustre_msg_early_size()
 {
         static int size = 0;
-        if (!size)
-                size = lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
+       if (!size) {
+               /* Always reply old ptlrpc_body_v2 to keep interoprability
+                * with the old client (< 2.3) which doesn't have pb_jobid
+                * in the ptlrpc_body.
+                *
+                * XXX Remove this whenever we dorp interoprability with such
+                *     client.
+                */
+               __u32 pblen = sizeof(struct ptlrpc_body_v2);
+               size = lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, &pblen);
+       }
         return size;
 }
 EXPORT_SYMBOL(lustre_msg_early_size);
@@ -153,7 +162,7 @@ int lustre_msg_size(__u32 magic, int count, __u32 *lens)
         }
 
         LASSERT(count > 0);
-        LASSERT(lens[MSG_PTLRPC_BODY_OFF] == sizeof(struct ptlrpc_body));
+       LASSERT(lens[MSG_PTLRPC_BODY_OFF] >= sizeof(struct ptlrpc_body_v2));
 
         switch (magic) {
         case LUSTRE_MSG_MAGIC_V2:
@@ -607,7 +616,7 @@ static inline int lustre_unpack_ptlrpc_body_v2(struct ptlrpc_request *req,
         struct ptlrpc_body *pb;
         struct lustre_msg_v2 *m = inout ? req->rq_reqmsg : req->rq_repmsg;
 
-        pb = lustre_msg_buf_v2(m, offset, sizeof(*pb));
+       pb = lustre_msg_buf_v2(m, offset, sizeof(struct ptlrpc_body_v2));
         if (!pb) {
                 CERROR("error unpacking ptlrpc body\n");
                 return -EFAULT;
@@ -781,7 +790,7 @@ static inline void *__lustre_swab_buf(struct lustre_msg *msg, int index,
 static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
 {
         return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
-                                 sizeof(struct ptlrpc_body));
+                                sizeof(struct ptlrpc_body_v2));
 }
 
 __u32 lustre_msghdr_get_flags(struct lustre_msg *msg)
@@ -1241,6 +1250,28 @@ __u32 lustre_msg_get_service_time(struct lustre_msg *msg)
         }
 }
 
+char *lustre_msg_get_jobid(struct lustre_msg *msg)
+{
+       switch (msg->lm_magic) {
+       case LUSTRE_MSG_MAGIC_V1:
+       case LUSTRE_MSG_MAGIC_V1_SWABBED:
+               return NULL;
+       case LUSTRE_MSG_MAGIC_V2: {
+               struct ptlrpc_body *pb =
+                       lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
+                                         sizeof(struct ptlrpc_body));
+               if (!pb)
+                       return NULL;
+
+               return pb->pb_jobid;
+       }
+       default:
+               CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+               return NULL;
+       }
+}
+EXPORT_SYMBOL(lustre_msg_get_jobid);
+
 __u32 lustre_msg_get_cksum(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
@@ -1450,6 +1481,33 @@ void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time)
         }
 }
 
+void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid)
+{
+       switch (msg->lm_magic) {
+       case LUSTRE_MSG_MAGIC_V1:
+               return;
+       case LUSTRE_MSG_MAGIC_V2: {
+               __u32 opc = lustre_msg_get_opc(msg);
+               struct ptlrpc_body *pb;
+
+               /* Don't set jobid for ldlm ast RPCs, they've been shrinked.
+                * See the comment in ptlrpc_request_pack(). */
+               if (!opc || opc == LDLM_BL_CALLBACK ||
+                   opc == LDLM_CP_CALLBACK || opc == LDLM_GL_CALLBACK)
+                       return;
+
+               pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
+                                      sizeof(struct ptlrpc_body));
+               LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+               memcpy(pb->pb_jobid, jobid, JOBSTATS_JOBID_SIZE);
+               return;
+       }
+       default:
+               LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+       }
+}
+EXPORT_SYMBOL(lustre_msg_set_jobid);
+
 void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum)
 {
         switch (msg->lm_magic) {
@@ -1555,6 +1613,12 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b)
         __swab64s (&b->pb_pre_versions[2]);
         __swab64s (&b->pb_pre_versions[3]);
         CLASSERT(offsetof(typeof(*b), pb_padding) != 0);
+       /* While we need to maintain compatibility between
+        * clients and servers without ptlrpc_body_v2 (< 2.3)
+        * do not swab any fields beyond pb_jobid, as we are
+        * using this swab function for both ptlrpc_body
+        * and ptlrpc_body_v2. */
+       CLASSERT(offsetof(typeof(*b), pb_jobid) != 0);
 }
 
 void lustre_swab_connect(struct obd_connect_data *ocd)
index a0eba5f..32b5b17 100644 (file)
@@ -237,6 +237,12 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
 void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
 {
         struct ptlrpcd_ctl *pc;
+       char jobid[JOBSTATS_JOBID_SIZE];
+
+       if (req->rq_reqmsg) {
+               lustre_get_jobid(jobid);
+               lustre_msg_set_jobid(req->rq_reqmsg, jobid);
+       }
 
         cfs_spin_lock(&req->rq_lock);
         if (req->rq_invalid_rqset) {
index 2a15654..fc34871 100644 (file)
@@ -639,7 +639,7 @@ void lustre_assert_wire_constants(void)
                  LUSTRE_MSG_MAGIC_V2_SWABBED);
 
         /* Checks for struct ptlrpc_body */
-        LASSERTF((int)sizeof(struct ptlrpc_body) == 152, "found %lld\n",
+       LASSERTF((int)sizeof(struct ptlrpc_body) == 184, " found %lld\n",
                  (long long)(int)sizeof(struct ptlrpc_body));
         LASSERTF((int)offsetof(struct ptlrpc_body, pb_handle) == 0, "found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_handle));
@@ -697,14 +697,14 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_service_time));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time));
-        LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, "found %lld\n",
-                 (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
-        LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, "found %lld\n",
-                 (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
-        LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, "found %lld\n",
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
+       LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
         CLASSERT(PTLRPC_NUM_VERSIONS == 4);
         LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, "found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
@@ -714,6 +714,12 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_padding[4]));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding[4]) == 8, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding[4]));
+       CLASSERT(JOBSTATS_JOBID_SIZE == 32);
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_jobid) == 152, " found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body, pb_jobid));
+       LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_jobid) == 32, " found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_jobid));
+
         LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n",
                  (long long)MSG_PTLRPC_BODY_OFF);
         LASSERTF(REQ_REC_OFF == 1, "found %lld\n",
index ae63acb..f1c0519 100644 (file)
@@ -8713,6 +8713,106 @@ test_204h() {
 }
 run_test 204h "Print raw stripe count and size ============="
 
+# Figure out which job scheduler is being used, if any,
+# or use a fake one
+if [ -n "$SLURM_JOB_ID" ]; then # SLURM
+       JOBENV=SLURM_JOB_ID
+elif [ -n "$LSB_JOBID" ]; then # Load Sharing Facility
+       JOBENV=LSB_JOBID
+elif [ -n "$PBS_JOBID" ]; then # PBS/Maui/Moab
+       JOBENV=PBS_JOBID
+elif [ -n "$LOADL_STEPID" ]; then # LoadLeveller
+       JOBENV=LOADL_STEP_ID
+elif [ -n "$JOB_ID" ]; then # Sun Grid Engine
+       JOBENV=JOB_ID
+else
+       JOBENV=FAKE_JOBID
+fi
+
+verify_jobstats() {
+       local cmd=$1
+       local target=$2
+
+       # clear old jobstats
+       do_facet $SINGLEMDS lctl set_param mdt.*.job_stats="clear"
+       do_facet ost0 lctl set_param obdfilter.*.job_stats="clear"
+
+       # use a new JobID for this test, or we might see an old one
+       [ "$JOBENV" = "FAKE_JOBID" ] && FAKE_JOBID=test_id.$testnum.$RANDOM
+
+       JOBVAL=${!JOBENV}
+       log "Test: $cmd"
+       log "Using JobID environment variable $JOBENV=$JOBVAL"
+
+       if [ $JOBENV = "FAKE_JOBID" ]; then
+               FAKE_JOBID=$JOBVAL $cmd
+       else
+               $cmd
+       fi
+
+       if [ "$target" = "mdt" -o "$target" = "both" ]; then
+               FACET="$SINGLEMDS" # will need to get MDS number for DNE
+               do_facet $FACET lctl get_param mdt.*.job_stats |
+                       grep $JOBVAL || error "No job stats found on MDT $FACET"
+       fi
+       if [ "$target" = "ost" -o "$target" = "both" ]; then
+               FACET=ost0
+               do_facet $FACET lctl get_param obdfilter.*.job_stats |
+                       grep $JOBVAL || error "No job stats found on OST $FACET"
+       fi
+}
+
+test_205() { # Job stats
+       local cmd
+       OLD_JOBENV=`$LCTL get_param -n llite.*.jobid_var`
+       if [ $OLD_JOBENV != $JOBENV ]; then
+               do_facet mgs $LCTL conf_param $FSNAME.llite.jobid_var=$JOBENV
+               wait_update $HOSTNAME "$LCTL get_param -n llite.*.jobid_var" \
+                       $JOBENV || return 1
+       fi
+
+       # mkdir
+       cmd="mkdir $DIR/$tfile"
+       verify_jobstats "$cmd" "mdt"
+       # rmdir
+       cmd="rm -fr $DIR/$tfile"
+       verify_jobstats "$cmd" "mdt"
+       # mknod
+       cmd="mknod $DIR/$tfile c 1 3"
+       verify_jobstats "$cmd" "mdt"
+       # unlink
+       cmd="rm -f $DIR/$tfile"
+       verify_jobstats "$cmd" "mdt"
+       # open & close
+       cmd="$SETSTRIPE -i 0 -c 1 $DIR/$tfile"
+       verify_jobstats "$cmd" "mdt"
+       # setattr
+       cmd="touch $DIR/$tfile"
+       verify_jobstats "$cmd" "both"
+       # write
+       cmd="dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 oflag=sync"
+       verify_jobstats "$cmd" "ost"
+       # read
+       cmd="dd if=$DIR/$tfile of=/dev/null bs=1M count=1 iflag=direct"
+       verify_jobstats "$cmd" "ost"
+       # truncate
+       cmd="$TRUNCATE $DIR/$tfile 0"
+       verify_jobstats "$cmd" "both"
+       # rename
+       cmd="mv -f $DIR/$tfile $DIR/jobstats_test_rename"
+       verify_jobstats "$cmd" "mdt"
+
+       # cleanup
+       rm -f $DIR/jobstats_test_rename
+
+       if [ $OLD_JOBENV != $JOBENV ]; then
+               do_facet mgs $LCTL conf_param $FSNAME.llite.jobid_var=$OLD_JOBENV
+               wait_update $HOSTNAME "$LCTL get_param -n llite.*.jobid_var" \
+                       $OLD_JOBENV || return 1
+       fi
+}
+run_test 205 "Verify job stats"
+
 test_212() {
        size=`date +%s`
        size=$((size % 8192 + 1))
index 2ca7f95..4b0458e 100644 (file)
@@ -320,11 +320,13 @@ check_ptlrpc_body(void)
         CHECK_MEMBER(ptlrpc_body, pb_conn_cnt);
         CHECK_MEMBER(ptlrpc_body, pb_timeout);
         CHECK_MEMBER(ptlrpc_body, pb_service_time);
-        CHECK_MEMBER(ptlrpc_body, pb_slv);
         CHECK_MEMBER(ptlrpc_body, pb_limit);
+       CHECK_MEMBER(ptlrpc_body, pb_slv);
         CHECK_CVALUE(PTLRPC_NUM_VERSIONS);
         CHECK_MEMBER(ptlrpc_body, pb_pre_versions[PTLRPC_NUM_VERSIONS]);
         CHECK_MEMBER(ptlrpc_body, pb_padding[4]);
+       CHECK_CVALUE(JOBSTATS_JOBID_SIZE);
+       CHECK_MEMBER(ptlrpc_body, pb_jobid);
 
         CHECK_VALUE(MSG_PTLRPC_BODY_OFF);
         CHECK_VALUE(REQ_REC_OFF);
index dcb42b6..b244e78 100644 (file)
@@ -645,7 +645,7 @@ void lustre_assert_wire_constants(void)
                  LUSTRE_MSG_MAGIC_V2_SWABBED);
 
         /* Checks for struct ptlrpc_body */
-        LASSERTF((int)sizeof(struct ptlrpc_body) == 152, "found %lld\n",
+       LASSERTF((int)sizeof(struct ptlrpc_body) == 184, " found %lld\n",
                  (long long)(int)sizeof(struct ptlrpc_body));
         LASSERTF((int)offsetof(struct ptlrpc_body, pb_handle) == 0, "found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_handle));
@@ -703,14 +703,14 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_service_time));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time));
-        LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, "found %lld\n",
-                 (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
-        LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, "found %lld\n",
-                 (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
-        LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, "found %lld\n",
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
+       LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
         CLASSERT(PTLRPC_NUM_VERSIONS == 4);
         LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, "found %lld\n",
                  (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
@@ -720,6 +720,12 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ptlrpc_body, pb_padding[4]));
         LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding[4]) == 8, "found %lld\n",
                  (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding[4]));
+       CLASSERT(JOBSTATS_JOBID_SIZE == 32);
+       LASSERTF((int)offsetof(struct ptlrpc_body, pb_jobid) == 152, " found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body, pb_jobid));
+       LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_jobid) == 32, " found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_jobid));
+
         LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n",
                  (long long)MSG_PTLRPC_BODY_OFF);
         LASSERTF(REQ_REC_OFF == 1, "found %lld\n",