Whamcloud - gitweb
Branch b1_6
authorbobijam <bobijam>
Fri, 6 Mar 2009 04:06:22 +0000 (04:06 +0000)
committerbobijam <bobijam>
Fri, 6 Mar 2009 04:06:22 +0000 (04:06 +0000)
b=17536
o=johann
i=zhenyu.xu (bobijam)
i=adilger

MDS create should not wait for statfs RPC while holding DLM lock.

lustre/ChangeLog
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/ptlrpc/ptlrpcd.c

index 6f50845..3846142 100644 (file)
@@ -31,14 +31,18 @@ tbd Sun Microsystems, Inc.
          of Lustre filesystem with 4K stack may cause a stack overflow. For
          more information, please refer to bugzilla 17630.
 
+Severity   : enhancement
+Bugzilla   : 17536
+Description: MDS create should not wait for statfs RPC while holding DLM lock.
+
 Severity   : normal
-Frequency  : normal 
+Frequency  : normal
 Bugzilla   : 12069
-Descriptoin: OST grant too much space to client even there are not enough space. 
+Descriptoin: OST grant too much space to client even there are not enough space.
 Details    : Client will shrink its grant cache to OST if there are no write
             activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve
             this grant cache if there are already not enough avaible space
-            (left_space < total_clients * 32M). 
+            (left_space < total_clients * 32M).
 
 Severity   : enhancement
 Bugzilla   : 18289
@@ -68,7 +72,7 @@ Frequency  : start MDS on uncleanly shutdowned MDS device
 Bugzilla   : 16839
 Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished
 Details    : stay in waiting mds<>ost recovery finished produce random bugs
-             due race between two ll_sync thread for one lov target. send 
+             due race between two ll_sync thread for one lov target. send
              ACTIVATE event only if connect realy finished and import have
              FULL state.
 
index 1c4d9a3..b8456a8 100644 (file)
@@ -627,9 +627,14 @@ struct obd_statfs {
 extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_STATFS_NODELAY      0x0001  /* requests should be send without delay
                                          * and resends for avoid deadlocks */
-
 #define OBD_STATFS_FROM_CACHE   0x0002  /* the statfs callback should not update
                                          * obd_osfs_age */
+#define OBD_STATFS_PTLRPCD      0x0004  /* requests will be sent via ptlrpcd
+                                         * instead of a specific set. This
+                                         * means that we cannot rely on the set
+                                         * interpret routine to be called.
+                                         * lov_statfs_fini() must thus be called
+                                         * by the request interpret routine */
 
 /* ost_body.data values for OST_BRW */
 
@@ -790,7 +795,7 @@ struct lu_fid {
         fid_oid(fid), \
         fid_ver(fid)
 
-enum { 
+enum {
         /** put FID sequence at this offset in ldlm_res_id. */
         LUSTRE_RES_ID_SEQ_OFF = 0,
         /** put FID oid at this offset in ldlm_res_id. */
@@ -950,7 +955,7 @@ static inline void fid_init(struct lu_fid *fid)
 /**
  * Check if a fid is igif or not.
  * \param fid the fid to be tested.
- * \return true if the fid is a igif; otherwise false. 
+ * \return true if the fid is a igif; otherwise false.
  */
 static inline int fid_is_igif(const struct lu_fid *fid)
 {
@@ -960,7 +965,7 @@ static inline int fid_is_igif(const struct lu_fid *fid)
 /**
  * Check if a fid is idif or not.
  * \param fid the fid to be tested.
- * \return true if the fid is a idif; otherwise false. 
+ * \return true if the fid is a idif; otherwise false.
  */
 static inline int fid_is_idif(const struct lu_fid *fid)
 {
@@ -993,7 +998,7 @@ static inline int fid_is_sane(const struct lu_fid *fid)
 /**
  * Check if a fid is zero.
  * \param fid the fid to be tested.
- * \return true if the fid is zero; otherwise false. 
+ * \return true if the fid is zero; otherwise false.
  */
 static inline int fid_is_zero(const struct lu_fid *fid)
 {
@@ -1014,7 +1019,7 @@ static inline ino_t lu_igif_ino(const struct lu_fid *fid)
  * Get inode generation from a igif.
  * \param fid a igif to get inode generation from.
  * \return inode generation for the igif.
- */ 
+ */
 static inline __u32 lu_igif_gen(const struct lu_fid *fid)
 {
         return fid_oid(fid);
index 9db4bb1..60d0977 100644 (file)
@@ -695,7 +695,7 @@ struct ptlrpcd_ctl {
         char                        pc_name[16];
 #ifndef __KERNEL__
         /**
-         * Async rpcs flag to make sure that ptlrpcd_check() is called only 
+         * Async rpcs flag to make sure that ptlrpcd_check() is called only
          * once.
          */
         int                         pc_recurred;
@@ -974,19 +974,19 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
 {
         if (req->rq_phase == new_phase)
                 return;
-        
+
         if (new_phase == RQ_PHASE_UNREGISTERING) {
                 req->rq_next_phase = req->rq_phase;
                 if (req->rq_import)
                         atomic_inc(&req->rq_import->imp_unregistering);
         }
-        
+
         if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
                 if (req->rq_import)
                         atomic_dec(&req->rq_import->imp_unregistering);
         }
 
-        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", 
+        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"",
                   ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
 
         req->rq_phase = new_phase;
@@ -1113,7 +1113,7 @@ int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
 
 /* ptlrpc/pinger.c */
 enum timeout_event {
-        TIMEOUT_GRANT = 1 
+        TIMEOUT_GRANT = 1
 };
 struct timeout_item;
 typedef int (*timeout_cb_t)(struct timeout_item *, void *);
@@ -1138,6 +1138,7 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc);
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
 
index 242f92d..7f31c6c 100644 (file)
@@ -192,7 +192,7 @@ struct obd_info {
          * level. E.g. it is used for update lsm->lsm_oinfo at every recieved
          * request in osc level for enqueue requests. It is also possible to
          * update some caller data from LOV layer if needed. */
-        obd_enqueue_update_f     oi_cb_up;
+        obd_enqueue_update_f    oi_cb_up;
 };
 
 /* compare all relevant fields. */
@@ -661,6 +661,10 @@ struct lov_qos_rr {
         unsigned long       lqr_dirty:1;     /* recalc round-robin list */
 };
 
+struct lov_statfs_data {
+        struct obd_info   lsd_oi;
+        struct obd_statfs lsd_statfs;
+};
 /* Stripe placement optimization */
 struct lov_qos {
         struct list_head    lq_oss_list;    /* list of OSSs that targets use */
@@ -672,7 +676,12 @@ struct lov_qos {
         unsigned long       lq_dirty:1,     /* recalc qos data */
                             lq_same_space:1,/* the ost's all have approx.
                                                the same space avail */
-                            lq_reset:1;     /* zero current penalties */
+                            lq_reset:1,     /* zero current penalties */
+                            lq_statfs_in_progress:1; /* statfs op in progress */
+        /* qos statfs data */
+        struct lov_statfs_data *lq_statfs_data;
+        cfs_waitq_t         lq_statfs_waitq; /* waitqueue to notify statfs
+                                              * requests completion */
 };
 
 struct lov_tgt_desc {
index d7a8f07..f19e5dd 100644 (file)
@@ -180,6 +180,8 @@ int qos_del_tgt(struct obd_device *obd, struct lov_tgt_desc *tgt);
 void qos_shrink_lsm(struct lov_request_set *set);
 int qos_prep_create(struct obd_export *exp, struct lov_request_set *set);
 void qos_update(struct lov_obd *lov);
+void qos_statfs_done(struct lov_obd *lov);
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait);
 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
 
 /* lov_request.c */
@@ -249,6 +251,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                     int success);
 int lov_fini_statfs_set(struct lov_request_set *set);
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
 
 /* lov_obd.c */
 void lov_fix_desc(struct lov_desc *desc);
index 9b8b014..45ddf6e 100644 (file)
@@ -95,7 +95,7 @@ void lov_putref(struct obd_device *obd)
         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
                 int i;
                 struct lov_tgt_desc *n;
-                CDEBUG(D_CONFIG, "destroying %d lov targets\n", 
+                CDEBUG(D_CONFIG, "destroying %d lov targets\n",
                        lov->lov_death_row);
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                         tgt = lov->lov_tgts[i];
@@ -242,7 +242,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
-                                       obd->obd_name, 
+                                       obd->obd_name,
                                        obd->obd_observer->obd_name,
                                        tgt_obd->obd_name, rc);
                                 break;
@@ -255,7 +255,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
 }
 
 #define MAX_STRING_SIZE 128
-static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, 
+static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                            struct obd_connect_data *data)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -297,7 +297,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
 
         if (activate) {
                 tgt_obd->obd_no_recov = 0;
-                /* FIXME this is probably supposed to be 
+                /* FIXME this is probably supposed to be
                    ptlrpc_set_import_active.  Horrible naming. */
                 ptlrpc_activate_import(imp);
         }
@@ -386,7 +386,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
-                
+
         *exp = class_conn2export(conn);
 
         /* Why should there ever be more than 1 connect? */
@@ -406,7 +406,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate,
                                      &lov->lov_ocd);
                 if (rc) {
-                        CERROR("%s: lov connect tgt %d failed: %d\n", 
+                        CERROR("%s: lov connect tgt %d failed: %d\n",
                                obd->obd_name, i, rc);
                         continue;
                 }
@@ -435,7 +435,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
 
         ENTRY;
 
-        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
+        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
                obd->obd_name, osc_obd->obd_name);
 
         if (tgt->ltd_active) {
@@ -486,7 +486,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
         RETURN(0);
 }
 
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen);
 
 static int lov_disconnect(struct obd_export *exp)
@@ -617,7 +617,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 __u32 newsize, oldsize = 0;
 
                 newsize = max(lov->lov_tgt_size, (__u32)2);
-                while (newsize < index + 1) 
+                while (newsize < index + 1)
                         newsize = newsize << 1;
                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
                 if (newtgts == NULL) {
@@ -626,7 +626,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 }
 
                 if (lov->lov_tgt_size) {
-                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * 
+                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
                                lov->lov_tgt_size);
                         old = lov->lov_tgts;
                         oldsize = lov->lov_tgt_size;
@@ -668,8 +668,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 
         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
-        
-        if (lov->lov_connects == 0) { 
+
+        if (lov->lov_connects == 0) {
                 /* lov_connect hasn't been called yet. We'll do the
                    lov_connect_obd on this target when that fn first runs,
                    because we don't know the connect flags yet. */
@@ -686,13 +686,13 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         if (!tgt->ltd_exp)
                 GOTO(out, rc = 0);
 
-        rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 
+        rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
 
 out:
         if (rc) {
-                CERROR("add failed (%d), deleting %s\n", rc, 
+                CERROR("add failed (%d), deleting %s\n", rc,
                        obd_uuid2str(&tgt->ltd_uuid));
                 lov_del_target(obd, index, 0, 0);
         }
@@ -701,7 +701,7 @@ out:
 }
 
 /* Schedule a target for deletion */
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -731,7 +731,7 @@ static int lov_del_target(struct obd_device *obd, __u32 index,
 
         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
                lov_uuid2str(lov, index), index,
-               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, 
+               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
                lov->lov_tgts[index]->ltd_active);
 
         lov->lov_tgts[index]->ltd_reap = 1;
@@ -868,6 +868,11 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         lov->lov_qos.lq_prio_free = 232;
         /* Default threshold for rr (roughly 17%) */
         lov->lov_qos.lq_threshold_rr = 43;
+        /* Init statfs fields */
+        OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data);
+        if (NULL == lov->lov_qos.lq_statfs_data)
+                RETURN(-ENOMEM);
+        cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq);
 
         lov->lov_pools_hash_body = lustre_hash_init("POOLS", 7, 7,
                                                     &pool_hash_operations, 0);
@@ -958,19 +963,19 @@ static int lov_cleanup(struct obd_device *obd)
                         if (lov->lov_tgts[i]) {
                                 /* Inactive targets may never have connected */
                                 if (lov->lov_tgts[i]->ltd_active ||
-                                    atomic_read(&lov->lov_refcount)) 
-                                        /* We should never get here - these 
-                                           should have been removed in the 
+                                    atomic_read(&lov->lov_refcount))
+                                        /* We should never get here - these
+                                           should have been removed in the
                                            disconnect. */
                                         CERROR("lov tgt %d not cleaned!"
                                                " deathrow=%d, lovrc=%d\n",
-                                               i, lov->lov_death_row, 
+                                               i, lov->lov_death_row,
                                                atomic_read(&lov->lov_refcount));
                                 lov_del_target(obd, i, 0, 0);
                         }
                 }
                 lov_putref(obd);
-                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * 
+                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
                          lov->lov_tgt_size);
                 lov->lov_tgt_size = 0;
         }
@@ -978,6 +983,7 @@ static int lov_cleanup(struct obd_device *obd)
         /* clear pools parent proc entry only after all pools is killed */
         lprocfs_obd_cleanup(obd);
 
+        OBD_FREE_PTR(lov->lov_qos.lq_statfs_data);
         RETURN(0);
 }
 
@@ -1016,12 +1022,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
         case LCFG_PARAM: {
                 struct lprocfs_static_vars lvars = { 0 };
                 struct lov_desc *desc = &(obd->u.lov.desc);
-                
+
                 if (!desc)
                         GOTO(out, rc = -EINVAL);
-                
+
                 lprocfs_lov_init_vars(&lvars);
-                
+
                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
                                               lcfg, obd);
                 GOTO(out, rc);
@@ -1092,14 +1098,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
                         continue;
 
-                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, 
+                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
                        obd_uuid2str(ost_uuid));
 
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
 
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                err = obd_create(lov->lov_tgts[i]->ltd_exp, 
+                err = obd_create(lov->lov_tgts[i]->ltd_exp,
                                  tmp_oa, &obj_mdp, oti);
                 if (err) {
                         /* This export will be disabled until it is recovered,
@@ -1166,8 +1172,6 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         struct obd_info oinfo;
         struct lov_request_set *set = NULL;
         struct lov_request *req;
-        struct obd_statfs osfs;
-        __u64 maxage;
         int rc = 0;
         ENTRY;
 
@@ -1193,8 +1197,11 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                  GOTO(out, rc);
         }
 
-        maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
-        obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
+        /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s,
+         * later in alloc_qos(), we will wait for those rpcs to complete if
+         * the osfs age is older than 2 * qos_maxage */
+        qos_statfs_update(exp->exp_obd,
+                          cfs_time_shift_64(-lov->desc.ld_qos_maxage) + HZ, 0);
 
         rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
         if (rc)
@@ -1303,7 +1310,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
 
                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
@@ -1324,7 +1331,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
         RETURN(rc);
 }
 
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, 
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
                                  void *data, int rc)
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
@@ -1361,14 +1368,14 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
 
         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
-               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, 
+               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
                oinfo->oi_md->lsm_stripe_size);
 
         list_for_each (pos, &lovset->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                        &req->rq_oi, rqset);
@@ -1412,12 +1419,12 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(-ENODEV);
 
         /* for now, we only expect the following updates here */
-        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | 
-                                            OBD_MD_FLMODE | OBD_MD_FLATIME | 
+        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
+                                            OBD_MD_FLMODE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE | 
-                                            OBD_MD_FLGROUP | OBD_MD_FLUID | 
-                                            OBD_MD_FLGID | OBD_MD_FLFID | 
+                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
+                                            OBD_MD_FLGROUP | OBD_MD_FLUID |
+                                            OBD_MD_FLGID | OBD_MD_FLFID |
                                             OBD_MD_FLGENER)));
         lov = &exp->exp_obd->u.lov;
         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
@@ -1427,13 +1434,13 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
         list_for_each (pos, &set->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, 
+                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                  &req->rq_oi, NULL);
                 err = lov_update_setattr_set(set, req, rc);
                 if (err) {
                         CERROR("error: setattr objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
-                               set->set_oi->oi_oa->o_id, 
+                               set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
                         if (!rc)
                                 rc = err;
@@ -1866,12 +1873,12 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!page) {
                 int i = 0;
                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
-                   Only because of this layering limitation will a client 
+                   Only because of this layering limitation will a client
                    mount with no osts fail */
-                while (!lov->lov_tgts || !lov->lov_tgts[i] || 
+                while (!lov->lov_tgts || !lov->lov_tgts[i] ||
                        !lov->lov_tgts[i]->ltd_exp) {
                         i++;
-                        if (i >= lov->desc.ld_tgt_count) 
+                        if (i >= lov->desc.ld_tgt_count)
                                 RETURN(-ENOMEDIUM);
                 }
                 rc = size_round(sizeof(*lap)) +
@@ -2006,7 +2013,7 @@ static int lov_trigger_group_io(struct obd_export *exp,
 
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -2333,8 +2340,7 @@ static int lov_join_lru(struct obd_export *exp,
         RETURN(count);
 }
 
-static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
-                                void *data, int rc)
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
         int err;
@@ -2458,7 +2464,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 genp = (__u32 *)data->ioc_inlbuf3;
                 /* the uuid will be empty for deleted OSTs */
                 for (i = 0; i < count; i++, uuidp++, genp++) {
-                        if (!lov->lov_tgts[i]) 
+                        if (!lov->lov_tgts[i])
                                 continue;
                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
                         *genp = lov->lov_tgts[i]->ltd_gen;
@@ -3261,7 +3267,7 @@ int __init lov_init(void)
         ENTRY;
 
         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
-                                              sizeof(struct lov_oinfo), 
+                                              sizeof(struct lov_oinfo),
                                               0, SLAB_HWCACHE_ALIGN);
         if (lov_oinfo_slab == NULL)
                 return -ENOMEM;
index 61025ef..3193642 100644 (file)
@@ -47,6 +47,7 @@
 
 #include <obd_class.h>
 #include <obd_lov.h>
+#include <lustre/lustre_idl.h>
 #include "lov_internal.h"
 
 /* #define QOS_DEBUG 1 */
@@ -752,6 +753,11 @@ static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt,
         }
 
         lov_getref(exp->exp_obd);
+        /* wait for fresh statfs info if needed, the rpcs are sent in
+         * lov_create() */
+        qos_statfs_update(exp->exp_obd,
+                          cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1);
+
         down_write(&lov->lov_qos.lq_rw_sem);
 
         if (lov->desc.ld_active_tgt_count < 2)
@@ -1074,3 +1080,110 @@ void qos_update(struct lov_obd *lov)
         ENTRY;
         lov->lov_qos.lq_dirty = 1;
 }
+
+void qos_statfs_done(struct lov_obd *lov)
+{
+        LASSERT(lov->lov_qos.lq_statfs_in_progress);
+        down_write(&lov->lov_qos.lq_rw_sem);
+        lov->lov_qos.lq_statfs_in_progress = 0;
+        /* wake up any threads waiting for the statfs rpcs to complete */
+        cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        up_write(&lov->lov_qos.lq_rw_sem);
+}
+
+static int qos_statfs_ready(struct obd_device *obd, __u64 max_age)
+{
+        struct lov_obd         *lov = &obd->u.lov;
+        int rc;
+        ENTRY;
+        down_read(&lov->lov_qos.lq_rw_sem);
+        rc = lov->lov_qos.lq_statfs_in_progress == 0 ||
+             cfs_time_beforeq_64(max_age, obd->obd_osfs_age);
+        up_read(&lov->lov_qos.lq_rw_sem);
+        RETURN(rc);
+}
+
+/*
+ * Update statfs data if the current osfs age is older than max_age.
+ * If wait is not set, it means that we are called from lov_create()
+ * and we should just issue the rpcs without waiting for them to complete.
+ * If wait is set, we are called from alloc_qos() and we just have
+ * to wait for the request set to complete.
+ */
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait)
+{
+        struct lov_obd         *lov = &obd->u.lov;
+        struct obd_info        *oinfo;
+        int                     rc = 0;
+        struct ptlrpc_request_set *set = NULL;
+        ENTRY;
+
+        if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
+                /* statfs data are quite recent, don't need to refresh it */
+                RETURN_EXIT;
+
+        if (!wait && lov->lov_qos.lq_statfs_in_progress)
+                /* statfs already in progress */
+                RETURN_EXIT;
+
+        down_write(&lov->lov_qos.lq_rw_sem);
+        if (lov->lov_qos.lq_statfs_in_progress) {
+                up_write(&lov->lov_qos.lq_rw_sem);
+                GOTO(out, rc = 0);
+        }
+        /* no statfs in flight, send rpcs */
+        lov->lov_qos.lq_statfs_in_progress = 1;
+        up_write(&lov->lov_qos.lq_rw_sem);
+
+        if (wait)
+                CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data "
+                       "in a timely manner (osfs age "LPU64", max age "LPU64")"
+                       ", sending new statfs rpcs\n",
+                       obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age,
+                       max_age);
+
+        /* need to send statfs rpcs */
+        CDEBUG(D_QOS, "sending new statfs requests\n");
+        memset(lov->lov_qos.lq_statfs_data, 0,
+               sizeof(*lov->lov_qos.lq_statfs_data));
+        oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi;
+        oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs;
+        oinfo->oi_flags = OBD_STATFS_NODELAY;
+        set = ptlrpc_prep_set();
+        if (!set)
+                GOTO(out_failed, rc = -ENOMEM);
+
+        rc = obd_statfs_async(obd, oinfo, max_age, set);
+        if (rc || list_empty(&set->set_requests)) {
+                if (rc)
+                        CWARN("statfs failed with %d\n", rc);
+                GOTO(out_failed, rc);
+        }
+        /* send requests via ptlrpcd */
+        oinfo->oi_flags |= OBD_STATFS_PTLRPCD;
+        ptlrpcd_add_rqset(set);
+        GOTO(out, rc);
+
+out_failed:
+        down_write(&lov->lov_qos.lq_rw_sem);
+        lov->lov_qos.lq_statfs_in_progress = 0;
+        /* wake up any threads waiting for the statfs rpcs to complete */
+        cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+        up_write(&lov->lov_qos.lq_rw_sem);
+        wait = 0;
+out:
+        if (set)
+                ptlrpc_set_destroy(set);
+        if (wait) {
+                struct l_wait_info lwi = { 0 };
+                CDEBUG(D_QOS, "waiting for statfs requests to complete\n");
+                l_wait_event(lov->lov_qos.lq_statfs_waitq,
+                             qos_statfs_ready(obd, max_age), &lwi);
+                if (cfs_time_before_64(obd->obd_osfs_age, max_age))
+                        CDEBUG(D_QOS, "%s: still no fresh statfs data after "
+                                      "waiting (osfs age "LPU64", max age "
+                                      LPU64")\n",
+                                      obd_uuid2str(&lov->desc.ld_uuid),
+                                      obd->obd_osfs_age, max_age);
+        }
+}
index 7e2eda5..9fb2944 100644 (file)
@@ -113,7 +113,7 @@ int lov_update_common_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -876,7 +876,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                         continue;
 
                 loi = oinfo->oi_md->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         GOTO(out, rc = -EIO);
@@ -1075,7 +1075,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
                 struct lov_request *req;
 
                 loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -1134,7 +1134,7 @@ int lov_update_setattr_set(struct lov_request_set *set,
         lov_update_set(set, req, rc);
 
         /* grace error on inactive ost */
-        if (rc && !(lov->lov_tgts[req->rq_idx] && 
+        if (rc && !(lov->lov_tgts[req->rq_idx] &&
                     lov->lov_tgts[req->rq_idx]->ltd_active))
                 rc = 0;
 
@@ -1576,7 +1576,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 }
 
 /* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
 static int cb_statfs_update(struct obd_info *oinfo, int rc)
 {
         struct lov_request *lovreq;
@@ -1602,7 +1602,7 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc)
                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
                         rc = 0;
-                RETURN(rc);
+                GOTO(out, rc);
         }
 
         spin_lock(&obd->obd_osfs_lock);
@@ -1613,6 +1613,14 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc)
 
         lov_update_statfs(osfs, lov_sfs, success);
         qos_update(lov);
+out:
+        if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+            lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+               lov_statfs_interpret(NULL, lovreq->rq_rqset,
+                                    lovreq->rq_rqset->set_success !=
+                                                  lovreq->rq_rqset->set_count);
+               qos_statfs_done(lov);
+        }
 
         RETURN(0);
 }
index 3ccff0f..c5bb52e 100644 (file)
@@ -69,7 +69,29 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
-/* 
+/*
+ * Move all request from an existing request set to the ptlrpcd queue.
+ * All requests from the set must be in phase RQ_PHASE_NEW.
+ */
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
+{
+        struct list_head *tmp, *pos;
+
+        list_for_each_safe(pos, tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(pos, struct ptlrpc_request, rq_set_chain);
+
+                LASSERT(req->rq_phase == RQ_PHASE_NEW);
+                list_del_init(&req->rq_set_chain);
+                req->rq_set = NULL;
+                ptlrpcd_add_req(req);
+                set->set_remaining--;
+        }
+        LASSERT(set->set_remaining == 0);
+}
+EXPORT_SYMBOL(ptlrpcd_add_rqset);
+
+/*
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
@@ -86,7 +108,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
         if (rc) {
                 int (*interpreter)(struct ptlrpc_request *,
                                    void *, int);
-                                
+
                 interpreter = req->rq_interpret_reply;
 
                 /*
@@ -116,8 +138,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
                 list_del_init(&req->rq_set_chain);
                 ptlrpc_set_add_req(pc->pc_set, req);
-                /* 
-                 * Need to calculate its timeout. 
+                /*
+                 * Need to calculate its timeout.
                  */
                 rc = 1;
         }
@@ -126,9 +148,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         if (pc->pc_set->set_remaining) {
                 rc = rc | ptlrpc_check_set(pc->pc_set);
 
-                /* 
+                /*
                  * XXX: our set never completes, so we prune the completed
-                 * reqs after each iteration. boy could this be smarter. 
+                 * reqs after each iteration. boy could this be smarter.
                  */
                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
                         req = list_entry(pos, struct ptlrpc_request,
@@ -143,8 +165,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         }
 
         if (rc == 0) {
-                /* 
-                 * If new requests have been added, make sure to wake up. 
+                /*
+                 * If new requests have been added, make sure to wake up.
                  */
                 spin_lock(&pc->pc_set->set_new_req_lock);
                 rc = !list_empty(&pc->pc_set->set_new_requests);
@@ -155,7 +177,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
 }
 
 #ifdef __KERNEL__
-/* 
+/*
  * ptlrpc's code paths like to execute in process context, so we have this
  * thread which spins on a set which contains the io rpcs. llite specifies
  * ptlrpcd's set when it pushes pages down into the oscs.
@@ -173,18 +195,18 @@ static int ptlrpcd(void *arg)
 
         complete(&pc->pc_starting);
 
-        /* 
+        /*
          * This mainloop strongly resembles ptlrpc_set_wait() except that our
          * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
-         * there are requests in the set. New requests come in on the set's 
-         * new_req_list and ptlrpcd_check() moves them into the set. 
+         * there are requests in the set. New requests come in on the set's
+         * new_req_list and ptlrpcd_check() moves them into the set.
          */
         do {
                 struct l_wait_info lwi;
                 int timeout;
 
                 timeout = ptlrpc_set_next_timeout(pc->pc_set);
-                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), 
+                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
                                   ptlrpc_expired_set, pc->pc_set);
 
                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
@@ -198,14 +220,14 @@ static int ptlrpcd(void *arg)
                         exit++;
                 }
 
-                /* 
+                /*
                  * Let's make one more loop to make sure that ptlrpcd_check()
                  * copied all raced new rpcs into the set so we can kill them.
                  */
         } while (exit < 2);
 
-        /* 
-         * Wait for inflight requests to drain. 
+        /*
+         * Wait for inflight requests to drain.
          */
         if (!list_empty(&pc->pc_set->set_requests))
                 ptlrpc_set_wait(pc->pc_set);
@@ -225,8 +247,8 @@ int ptlrpcd_check_async_rpcs(void *arg)
         struct ptlrpcd_ctl *pc = arg;
         int                  rc = 0;
 
-        /* 
-         * Single threaded!! 
+        /*
+         * Single threaded!!
          */
         pc->pc_recurred++;
 
@@ -234,8 +256,8 @@ int ptlrpcd_check_async_rpcs(void *arg)
                 rc = ptlrpcd_check(pc);
                 if (!rc)
                         ptlrpc_expired_set(pc->pc_set);
-                /* 
-                 * XXX: send replay requests. 
+                /*
+                 * XXX: send replay requests.
                  */
                 if (pc == &ptlrpcd_recovery_pc)
                         rc = ptlrpcd_check(pc);
@@ -259,9 +281,9 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 {
         int rc = 0;
         ENTRY;
-        /* 
-         * Do not allow start second thread for one pc. 
+
+        /*
+         * Do not allow start second thread for one pc.
          */
         if (test_bit(LIOD_START, &pc->pc_flags)) {
                 CERROR("Starting second thread (%s) for same pc %p\n",