Whamcloud - gitweb
Branch b1_6
authorbobijam <bobijam>
Fri, 6 Mar 2009 03:52:54 +0000 (03:52 +0000)
committerbobijam <bobijam>
Fri, 6 Mar 2009 03:52:54 +0000 (03:52 +0000)
b=17536
o=johann
i=zhenyu.xu (bobijam)
i=adilger

MDS create should not wait for statfs RPC while holding DLM lock.

lustre/ChangeLog
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/ptlrpc/ptlrpcd.c

index e7a995d..e04effb 100644 (file)
@@ -45,17 +45,21 @@ tbd Sun Microsystems, Inc.
          more information, please refer to bugzilla 17630.
 
 Severity   : enhancement
+Bugzilla   : 17536
+Description: MDS create should not wait for statfs RPC while holding DLM lock.
+
+Severity   : enhancement
 Bugzilla   : 18289
 Description: Update to RHEL5U3 kernel-2.6.18-128.1.1.el5.
 
 Severity   : normal
-Frequency  : normal 
+Frequency  : normal
 Bugzilla   : 12069
-Descriptoin: OST grant too much space to client even there are not enough space. 
+Descriptoin: OST grant too much space to client even there are not enough space.
 Details    : Client will shrink its grant cache to OST if there are no write
             activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve
             this grant cache if there are already not enough avaible space
-            (left_space < total_clients * 32M). 
+            (left_space < total_clients * 32M).
 
 Severity   : enhancement
 Bugzilla   : 14250
@@ -74,7 +78,7 @@ Frequency  : start MDS on uncleanly shutdowned MDS device
 Bugzilla   : 16839
 Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished
 Details    : stay in waiting mds<>ost recovery finished produce random bugs
-             due race between two ll_sync thread for one lov target. send 
+             due race between two ll_sync thread for one lov target. send
              ACTIVATE event only if connect realy finished and import have
              FULL state.
 
index 3b98b34..a1ae407 100644 (file)
@@ -589,9 +589,14 @@ struct obd_statfs {
 extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_STATFS_NODELAY      0x0001  /* requests should be send without delay
                                          * and resends for avoid deadlocks */
-
 #define OBD_STATFS_FROM_CACHE   0x0002  /* the statfs callback should not update
                                          * obd_osfs_age */
+#define OBD_STATFS_PTLRPCD      0x0004  /* requests will be sent via ptlrpcd
+                                         * instead of a specific set. This
+                                         * means that we cannot rely on the set
+                                         * interpret routine to be called.
+                                         * lov_statfs_fini() must thus be called
+                                         * by the request interpret routine */
 
 /* ost_body.data values for OST_BRW */
 
@@ -632,7 +637,7 @@ extern void lustre_swab_niobuf_remote (struct niobuf_remote *nbr);
 
 /* lock value block communicated between the filter and llite */
 
-/* OST_LVB_ERR_INIT is needed because the return code in rc is 
+/* OST_LVB_ERR_INIT is needed because the return code in rc is
  * negative, i.e. because ((MASK + rc) & MASK) != MASK. */
 #define OST_LVB_ERR_INIT 0xffbadbad80000000ULL
 #define OST_LVB_ERR_MASK 0xffbadbad00000000ULL
index 0b08b84..e0b7fc8 100644 (file)
@@ -695,7 +695,7 @@ struct ptlrpcd_ctl {
         char                        pc_name[16];
 #ifndef __KERNEL__
         /**
-         * Async rpcs flag to make sure that ptlrpcd_check() is called only 
+         * Async rpcs flag to make sure that ptlrpcd_check() is called only
          * once.
          */
         int                         pc_recurred;
@@ -971,19 +971,19 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
 {
         if (req->rq_phase == new_phase)
                 return;
-        
+
         if (new_phase == RQ_PHASE_UNREGISTERING) {
                 req->rq_next_phase = req->rq_phase;
                 if (req->rq_import)
                         atomic_inc(&req->rq_import->imp_unregistering);
         }
-        
+
         if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
                 if (req->rq_import)
                         atomic_dec(&req->rq_import->imp_unregistering);
         }
 
-        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", 
+        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"",
                   ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
 
         req->rq_phase = new_phase;
@@ -1110,7 +1110,7 @@ int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
 
 /* ptlrpc/pinger.c */
 enum timeout_event {
-        TIMEOUT_GRANT = 1 
+        TIMEOUT_GRANT = 1
 };
 struct timeout_item;
 typedef int (*timeout_cb_t)(struct timeout_item *, void *);
@@ -1135,6 +1135,7 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc);
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
 
index e7ce881..5092ef2 100644 (file)
@@ -190,7 +190,7 @@ struct obd_info {
          * level. E.g. it is used for update lsm->lsm_oinfo at every recieved
          * request in osc level for enqueue requests. It is also possible to
          * update some caller data from LOV layer if needed. */
-        obd_enqueue_update_f     oi_cb_up;
+        obd_enqueue_update_f    oi_cb_up;
 };
 
 /* compare all relevant fields. */
@@ -615,6 +615,10 @@ struct ltd_qos {
         unsigned int        ltq_usable:1;    /* usable for striping */
 };
 
+struct lov_statfs_data {
+        struct obd_info   lsd_oi;
+        struct obd_statfs lsd_statfs;
+};
 struct lov_qos {
         struct list_head    lq_oss_list;    /* list of OSSs that targets use */
         struct rw_semaphore lq_rw_sem;
@@ -627,7 +631,12 @@ struct lov_qos {
                             lq_dirty_rr:1,  /* recalc round-robin list */
                             lq_same_space:1,/* the ost's all have approx.
                                                the same space avail */
-                            lq_reset:1;     /* zero current penalties */
+                            lq_reset:1,     /* zero current penalties */
+                            lq_statfs_in_progress:1; /* statfs op in progress */
+        /* qos statfs data */
+        struct lov_statfs_data *lq_statfs_data;
+        cfs_waitq_t         lq_statfs_waitq; /* waitqueue to notify statfs
+                                              * requests completion */
 };
 
 struct lov_tgt_desc {
index ac8b3e1..f38e2e9 100644 (file)
@@ -178,6 +178,8 @@ int qos_del_tgt(struct obd_device *obd, struct lov_tgt_desc *tgt);
 void qos_shrink_lsm(struct lov_request_set *set);
 int qos_prep_create(struct obd_export *exp, struct lov_request_set *set);
 void qos_update(struct lov_obd *lov);
+void qos_statfs_done(struct lov_obd *lov);
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait);
 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
 
 /* lov_request.c */
@@ -248,6 +250,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                     int success);
 int lov_fini_statfs_set(struct lov_request_set *set);
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
 
 /* lov_obd.c */
 void lov_fix_desc(struct lov_desc *desc);
index 85449dd..04a79d4 100644 (file)
@@ -95,7 +95,7 @@ void lov_putref(struct obd_device *obd)
         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
                 int i;
                 struct lov_tgt_desc *n;
-                CDEBUG(D_CONFIG, "destroying %d lov targets\n", 
+                CDEBUG(D_CONFIG, "destroying %d lov targets\n",
                        lov->lov_death_row);
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                         tgt = lov->lov_tgts[i];
@@ -241,7 +241,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
-                                       obd->obd_name, 
+                                       obd->obd_name,
                                        obd->obd_observer->obd_name,
                                        tgt_obd->obd_name, rc);
                                 break;
@@ -254,7 +254,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
 }
 
 #define MAX_STRING_SIZE 128
-static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, 
+static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                            struct obd_connect_data *data)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -296,7 +296,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
 
         if (activate) {
                 tgt_obd->obd_no_recov = 0;
-                /* FIXME this is probably supposed to be 
+                /* FIXME this is probably supposed to be
                    ptlrpc_set_import_active.  Horrible naming. */
                 ptlrpc_activate_import(imp);
         }
@@ -385,7 +385,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
-                
+
         *exp = class_conn2export(conn);
 
         /* Why should there ever be more than 1 connect? */
@@ -405,7 +405,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate,
                                      &lov->lov_ocd);
                 if (rc) {
-                        CERROR("%s: lov connect tgt %d failed: %d\n", 
+                        CERROR("%s: lov connect tgt %d failed: %d\n",
                                obd->obd_name, i, rc);
                         continue;
                 }
@@ -434,7 +434,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
 
         ENTRY;
 
-        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
+        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
                obd->obd_name, osc_obd->obd_name);
 
         if (tgt->ltd_active) {
@@ -485,7 +485,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
         RETURN(0);
 }
 
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen);
 
 static int lov_disconnect(struct obd_export *exp)
@@ -616,7 +616,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 __u32 newsize, oldsize = 0;
 
                 newsize = max(lov->lov_tgt_size, (__u32)2);
-                while (newsize < index + 1) 
+                while (newsize < index + 1)
                         newsize = newsize << 1;
                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
                 if (newtgts == NULL) {
@@ -625,7 +625,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 }
 
                 if (lov->lov_tgt_size) {
-                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * 
+                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
                                lov->lov_tgt_size);
                         old = lov->lov_tgts;
                         oldsize = lov->lov_tgt_size;
@@ -663,8 +663,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 
         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
-        
-        if (lov->lov_connects == 0) { 
+
+        if (lov->lov_connects == 0) {
                 /* lov_connect hasn't been called yet. We'll do the
                    lov_connect_obd on this target when that fn first runs,
                    because we don't know the connect flags yet. */
@@ -681,13 +681,13 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         if (!tgt->ltd_exp)
                 GOTO(out, rc = 0);
 
-        rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 
+        rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
 
 out:
         if (rc) {
-                CERROR("add failed (%d), deleting %s\n", rc, 
+                CERROR("add failed (%d), deleting %s\n", rc,
                        obd_uuid2str(&tgt->ltd_uuid));
                 lov_del_target(obd, index, 0, 0);
         }
@@ -696,7 +696,7 @@ out:
 }
 
 /* Schedule a target for deletion */
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -726,7 +726,7 @@ static int lov_del_target(struct obd_device *obd, __u32 index,
 
         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
                lov_uuid2str(lov, index), index,
-               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, 
+               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
                lov->lov_tgts[index]->ltd_active);
 
         lov->lov_tgts[index]->ltd_reap = 1;
@@ -860,6 +860,11 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         lov->lov_qos.lq_prio_free = 232;
         /* Default threshold for rr (roughly 17%) */
         lov->lov_qos.lq_threshold_rr = 43;
+        /* Init statfs fields */
+        OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data);
+        if (NULL == lov->lov_qos.lq_statfs_data)
+                RETURN(-ENOMEM);
+        cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq);
 
         lprocfs_lov_init_vars(&lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
@@ -920,25 +925,26 @@ static int lov_cleanup(struct obd_device *obd)
                         if (lov->lov_tgts[i]) {
                                 /* Inactive targets may never have connected */
                                 if (lov->lov_tgts[i]->ltd_active ||
-                                    atomic_read(&lov->lov_refcount)) 
-                                        /* We should never get here - these 
-                                           should have been removed in the 
+                                    atomic_read(&lov->lov_refcount))
+                                        /* We should never get here - these
+                                           should have been removed in the
                                            disconnect. */
                                         CERROR("lov tgt %d not cleaned!"
                                                " deathrow=%d, lovrc=%d\n",
-                                               i, lov->lov_death_row, 
+                                               i, lov->lov_death_row,
                                                atomic_read(&lov->lov_refcount));
                                 lov_del_target(obd, i, 0, 0);
                         }
                 }
-                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * 
+                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
                          lov->lov_tgt_size);
                 lov->lov_tgt_size = 0;
         }
 
-        if (lov->lov_qos.lq_rr_size) 
+        if (lov->lov_qos.lq_rr_size)
                 OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size);
 
+        OBD_FREE_PTR(lov->lov_qos.lq_statfs_data);
         RETURN(0);
 }
 
@@ -977,12 +983,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
         case LCFG_PARAM: {
                 struct lprocfs_static_vars lvars = { 0 };
                 struct lov_desc *desc = &(obd->u.lov.desc);
-                
+
                 if (!desc)
                         GOTO(out, rc = -EINVAL);
-                
+
                 lprocfs_lov_init_vars(&lvars);
-                
+
                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
                                               lcfg, obd);
                 GOTO(out, rc);
@@ -1047,14 +1053,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
                         continue;
 
-                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, 
+                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
                        obd_uuid2str(ost_uuid));
 
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
 
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                err = obd_create(lov->lov_tgts[i]->ltd_exp, 
+                err = obd_create(lov->lov_tgts[i]->ltd_exp,
                                  tmp_oa, &obj_mdp, oti);
                 if (err) {
                         /* This export will be disabled until it is recovered,
@@ -1121,8 +1127,6 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         struct obd_info oinfo;
         struct lov_request_set *set = NULL;
         struct lov_request *req;
-        struct obd_statfs osfs;
-        __u64 maxage;
         int rc = 0;
         ENTRY;
 
@@ -1148,8 +1152,11 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                  GOTO(out, rc);
         }
 
-        maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
-        obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
+        /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s,
+         * later in alloc_qos(), we will wait for those rpcs to complete if
+         * the osfs age is older than 2 * qos_maxage */
+        qos_statfs_update(exp->exp_obd,
+                          cfs_time_shift_64(-lov->desc.ld_qos_maxage) + HZ, 0);
 
         rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
         if (rc)
@@ -1257,7 +1264,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
 
                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
@@ -1278,7 +1285,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
         RETURN(rc);
 }
 
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, 
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
                                  void *data, int rc)
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
@@ -1315,14 +1322,14 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
 
         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
-               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, 
+               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
                oinfo->oi_md->lsm_stripe_size);
 
         list_for_each (pos, &lovset->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                        &req->rq_oi, rqset);
@@ -1366,12 +1373,12 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(-ENODEV);
 
         /* for now, we only expect the following updates here */
-        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | 
-                                            OBD_MD_FLMODE | OBD_MD_FLATIME | 
+        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
+                                            OBD_MD_FLMODE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE | 
-                                            OBD_MD_FLGROUP | OBD_MD_FLUID | 
-                                            OBD_MD_FLGID | OBD_MD_FLFID | 
+                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
+                                            OBD_MD_FLGROUP | OBD_MD_FLUID |
+                                            OBD_MD_FLGID | OBD_MD_FLFID |
                                             OBD_MD_FLGENER)));
         lov = &exp->exp_obd->u.lov;
         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
@@ -1381,13 +1388,13 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
         list_for_each (pos, &set->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, 
+                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                  &req->rq_oi, NULL);
                 err = lov_update_setattr_set(set, req, rc);
                 if (err) {
                         CERROR("error: setattr objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
-                               set->set_oi->oi_oa->o_id, 
+                               set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
                         if (!rc)
                                 rc = err;
@@ -1571,8 +1578,8 @@ static int lov_sync(struct obd_export *exp, struct obdo *oa,
         list_for_each (pos, &set->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, 
-                              req->rq_oi.oi_oa, NULL, 
+                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
+                              req->rq_oi.oi_oa, NULL,
                               req->rq_oi.oi_policy.l_extent.start,
                               req->rq_oi.oi_policy.l_extent.end);
                 err = lov_update_common_set(set, req, rc);
@@ -1799,12 +1806,12 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!page) {
                 int i = 0;
                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
-                   Only because of this layering limitation will a client 
+                   Only because of this layering limitation will a client
                    mount with no osts fail */
-                while (!lov->lov_tgts || !lov->lov_tgts[i] || 
+                while (!lov->lov_tgts || !lov->lov_tgts[i] ||
                        !lov->lov_tgts[i]->ltd_exp) {
                         i++;
-                        if (i >= lov->desc.ld_tgt_count) 
+                        if (i >= lov->desc.ld_tgt_count)
                                 RETURN(-ENOMEDIUM);
                 }
                 rc = size_round(sizeof(*lap)) +
@@ -1939,7 +1946,7 @@ static int lov_trigger_group_io(struct obd_export *exp,
 
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -2265,8 +2272,7 @@ static int lov_join_lru(struct obd_export *exp,
         RETURN(count);
 }
 
-static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
-                                void *data, int rc)
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
         int err;
@@ -2390,7 +2396,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 genp = (__u32 *)data->ioc_inlbuf3;
                 /* the uuid will be empty for deleted OSTs */
                 for (i = 0; i < count; i++, uuidp++, genp++) {
-                        if (!lov->lov_tgts[i]) 
+                        if (!lov->lov_tgts[i])
                                 continue;
                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
                         *genp = lov->lov_tgts[i]->ltd_gen;
@@ -3189,7 +3195,7 @@ int __init lov_init(void)
         ENTRY;
 
         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
-                                              sizeof(struct lov_oinfo), 
+                                              sizeof(struct lov_oinfo),
                                               0, SLAB_HWCACHE_ALIGN);
         if (lov_oinfo_slab == NULL)
                 return -ENOMEM;
index a301849..1532f50 100644 (file)
@@ -683,6 +683,11 @@ static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt,
                 RETURN(-EINVAL);
 
         lov_getref(exp->exp_obd);
+        /* wait for fresh statfs info if needed, the rpcs are sent in
+         * lov_create() */
+        qos_statfs_update(exp->exp_obd,
+                          cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1);
+
         down_write(&lov->lov_qos.lq_rw_sem);
 
         ost_count = lov->desc.ld_tgt_count;
@@ -987,3 +992,110 @@ void qos_update(struct lov_obd *lov)
         ENTRY;
         lov->lov_qos.lq_dirty = 1;
 }
+
+void qos_statfs_done(struct lov_obd *lov)
+{
+        LASSERT(lov->lov_qos.lq_statfs_in_progress);
+        down_write(&lov->lov_qos.lq_rw_sem);
+        lov->lov_qos.lq_statfs_in_progress = 0;
+        /* wake up any threads waiting for the statfs rpcs to complete */
+        cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        up_write(&lov->lov_qos.lq_rw_sem);
+}
+
+static int qos_statfs_ready(struct obd_device *obd, __u64 max_age)
+{
+        struct lov_obd         *lov = &obd->u.lov;
+        int rc;
+        ENTRY;
+        down_read(&lov->lov_qos.lq_rw_sem);
+        rc = lov->lov_qos.lq_statfs_in_progress == 0 ||
+             cfs_time_beforeq_64(max_age, obd->obd_osfs_age);
+        up_read(&lov->lov_qos.lq_rw_sem);
+        RETURN(rc);
+}
+
+/*
+ * Update statfs data if the current osfs age is older than max_age.
+ * If wait is not set, it means that we are called from lov_create()
+ * and we should just issue the rpcs without waiting for them to complete.
+ * If wait is set, we are called from alloc_qos() and we just have
+ * to wait for the request set to complete.
+ */
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait)
+{
+        struct lov_obd         *lov = &obd->u.lov;
+        struct obd_info        *oinfo;
+        int                     rc = 0;
+        struct ptlrpc_request_set *set = NULL;
+        ENTRY;
+
+        if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
+                /* statfs data are quite recent, don't need to refresh it */
+                RETURN_EXIT;
+
+        if (!wait && lov->lov_qos.lq_statfs_in_progress)
+                /* statfs already in progress */
+                RETURN_EXIT;
+
+        down_write(&lov->lov_qos.lq_rw_sem);
+        if (lov->lov_qos.lq_statfs_in_progress) {
+                up_write(&lov->lov_qos.lq_rw_sem);
+                GOTO(out, rc = 0);
+        }
+        /* no statfs in flight, send rpcs */
+        lov->lov_qos.lq_statfs_in_progress = 1;
+        up_write(&lov->lov_qos.lq_rw_sem);
+
+        if (wait)
+                CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data "
+                       "in a timely manner (osfs age "LPU64", max age "LPU64")"
+                       ", sending new statfs rpcs\n",
+                       obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age,
+                       max_age);
+
+        /* need to send statfs rpcs */
+        CDEBUG(D_QOS, "sending new statfs requests\n");
+        memset(lov->lov_qos.lq_statfs_data, 0,
+               sizeof(*lov->lov_qos.lq_statfs_data));
+        oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi;
+        oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs;
+        oinfo->oi_flags = OBD_STATFS_NODELAY;
+        set = ptlrpc_prep_set();
+        if (!set)
+                GOTO(out_failed, rc = -ENOMEM);
+
+        rc = obd_statfs_async(obd, oinfo, max_age, set);
+        if (rc || list_empty(&set->set_requests)) {
+                if (rc)
+                        CWARN("statfs failed with %d\n", rc);
+                GOTO(out_failed, rc);
+        }
+        /* send requests via ptlrpcd */
+        oinfo->oi_flags |= OBD_STATFS_PTLRPCD;
+        ptlrpcd_add_rqset(set);
+        GOTO(out, rc);
+
+out_failed:
+        down_write(&lov->lov_qos.lq_rw_sem);
+        lov->lov_qos.lq_statfs_in_progress = 0;
+        /* wake up any threads waiting for the statfs rpcs to complete */
+        cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        up_write(&lov->lov_qos.lq_rw_sem);
+        wait = 0;
+out:
+        if (set)
+                ptlrpc_set_destroy(set);
+        if (wait) {
+                struct l_wait_info lwi = { 0 };
+                CDEBUG(D_QOS, "waiting for statfs requests to complete\n");
+                l_wait_event(lov->lov_qos.lq_statfs_waitq,
+                             qos_statfs_ready(obd, max_age), &lwi);
+                if (cfs_time_before_64(obd->obd_osfs_age, max_age))
+                        CDEBUG(D_QOS, "%s: still no fresh statfs data after "
+                                      "waiting (osfs age "LPU64", max age "
+                                      LPU64")\n",
+                                      obd_uuid2str(&lov->desc.ld_uuid),
+                                      obd->obd_osfs_age, max_age);
+        }
+}
index c8caa46..6b27291 100644 (file)
@@ -113,7 +113,7 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -874,7 +874,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                         continue;
 
                 loi = oinfo->oi_md->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
@@ -1073,7 +1073,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
 
                 loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -1132,7 +1132,7 @@ int lov_update_setattr_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -1589,7 +1589,7 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc)
                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
                         rc = 0;
-                RETURN(rc);
+                GOTO(out, rc);
         }
 
         spin_lock(&obd->obd_osfs_lock);
@@ -1600,6 +1600,14 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc)
 
         lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+out:
+        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+               lov_statfs_interpret(NULL, lovreq->rq_rqset,
+                                    lovreq->rq_rqset->set_success !=
+                                                  lovreq->rq_rqset->set_count);
+               qos_statfs_done(lov);
+        }
 
         RETURN(0);
 }
index ff316c5..8224188 100644 (file)
@@ -69,7 +69,29 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
-/* 
+/*
+ * Move all request from an existing request set to the ptlrpcd queue.
+ * All requests from the set must be in phase RQ_PHASE_NEW.
+ */
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
+{
+        struct list_head *tmp, *pos;
+
+        list_for_each_safe(pos, tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(pos, struct ptlrpc_request, rq_set_chain);
+
+                LASSERT(req->rq_phase == RQ_PHASE_NEW);
+                list_del_init(&req->rq_set_chain);
+                req->rq_set = NULL;
+                ptlrpcd_add_req(req);
+                set->set_remaining--;
+        }
+        LASSERT(set->set_remaining == 0);
+}
+EXPORT_SYMBOL(ptlrpcd_add_rqset);
+
+/*
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
@@ -87,7 +109,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
         if (rc) {
                 int (*interpreter)(struct ptlrpc_request *,
                                    void *, int);
-                                   
+
                 interpreter = req->rq_interpret_reply;
 
                 /*
@@ -117,8 +139,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
                 list_del_init(&req->rq_set_chain);
                 ptlrpc_set_add_req(pc->pc_set, req);
-                /* 
-                 * Need to calculate its timeout. 
+                /*
+                 * Need to calculate its timeout.
                  */
                 rc = 1;
         }
@@ -127,9 +149,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         if (pc->pc_set->set_remaining) {
                 rc = rc | ptlrpc_check_set(pc->pc_set);
 
-                /* 
+                /*
                  * XXX: our set never completes, so we prune the completed
-                 * reqs after each iteration. boy could this be smarter. 
+                 * reqs after each iteration. boy could this be smarter.
                  */
                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
                         req = list_entry(pos, struct ptlrpc_request,
@@ -144,8 +166,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         }
 
         if (rc == 0) {
-                /* 
-                 * If new requests have been added, make sure to wake up. 
+                /*
+                 * If new requests have been added, make sure to wake up.
                  */
                 spin_lock(&pc->pc_set->set_new_req_lock);
                 rc = !list_empty(&pc->pc_set->set_new_requests);
@@ -156,7 +178,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
 }
 
 #ifdef __KERNEL__
-/* 
+/*
  * ptlrpc's code paths like to execute in process context, so we have this
  * thread which spins on a set which contains the io rpcs. llite specifies
  * ptlrpcd's set when it pushes pages down into the oscs.
@@ -174,18 +196,18 @@ static int ptlrpcd(void *arg)
 
         complete(&pc->pc_starting);
 
-        /* 
+        /*
          * This mainloop strongly resembles ptlrpc_set_wait() except that our
          * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
-         * there are requests in the set. New requests come in on the set's 
-         * new_req_list and ptlrpcd_check() moves them into the set. 
+         * there are requests in the set. New requests come in on the set's
+         * new_req_list and ptlrpcd_check() moves them into the set.
          */
         do {
                 struct l_wait_info lwi;
                 int timeout;
 
                 timeout = ptlrpc_set_next_timeout(pc->pc_set);
-                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), 
+                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
                                   ptlrpc_expired_set, pc->pc_set);
 
                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
@@ -199,14 +221,14 @@ static int ptlrpcd(void *arg)
                         exit++;
                 }
 
-                /* 
+                /*
                  * Let's make one more loop to make sure that ptlrpcd_check()
                  * copied all raced new rpcs into the set so we can kill them.
                  */
         } while (exit < 2);
 
-        /* 
-         * Wait for inflight requests to drain. 
+        /*
+         * Wait for inflight requests to drain.
          */
         if (!list_empty(&pc->pc_set->set_requests))
                 ptlrpc_set_wait(pc->pc_set);
@@ -226,8 +248,8 @@ int ptlrpcd_check_async_rpcs(void *arg)
         struct ptlrpcd_ctl *pc = arg;
         int                  rc = 0;
 
-        /* 
-         * Single threaded!! 
+        /*
+         * Single threaded!!
          */
         pc->pc_recurred++;
 
@@ -235,8 +257,8 @@ int ptlrpcd_check_async_rpcs(void *arg)
                 rc = ptlrpcd_check(pc);
                 if (!rc)
                         ptlrpc_expired_set(pc->pc_set);
-                /* 
-                 * XXX: send replay requests. 
+                /*
+                 * XXX: send replay requests.
                  */
                 if (pc == &ptlrpcd_recovery_pc)
                         rc = ptlrpcd_check(pc);
@@ -260,9 +282,9 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 {
         int rc = 0;
         ENTRY;
-        /* 
-         * Do not allow start second thread for one pc. 
+
+        /*
+         * Do not allow start second thread for one pc.
          */
         if (test_bit(LIOD_START, &pc->pc_flags)) {
                 CERROR("Starting second thread (%s) for same pc %p\n",