Whamcloud - gitweb
Branch: HEAD
authorwangdi <wangdi>
Thu, 26 Feb 2009 17:45:48 +0000 (17:45 +0000)
committerwangdi <wangdi>
Thu, 26 Feb 2009 17:45:48 +0000 (17:45 +0000)
b=12069
Shrink client grant cache when there are not enough space on OST.
i=adilger,johann,oleg

16 files changed:
lustre/ChangeLog
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/include/obd_ost.h
lustre/include/obd_support.h
lustre/llite/llite_lib.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_module.c

index 7d14da4..f461787 100644 (file)
@@ -14,6 +14,15 @@ tbd  Sun Microsystems, Inc.
        * File join has been disabled in this release, refer to Bugzilla 16929.
 
 Severity   : normal
+Frequency  : normal 
+Bugzilla   : 12069
+Descriptoin: OST grant too much space to client even there are not enough space. 
+Details    : Client will shrink its grant cache to OST if there are no write
+            activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve
+            this grant cache if there are already not enough avaible space
+            (left_space < total_clients * 32M). 
+
+Severity   : normal
 Frequency  : start MDS on uncleanly shutdowned MDS device
 Bugzilla   : 16839
 Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished
index 8c4fccc..df4d5b4 100644 (file)
@@ -765,6 +765,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define OBD_CONNECT_FID        0x40000000ULL /*FID is supported by server */
 #define OBD_CONNECT_VBR        0x80000000ULL /*version based recovery */
 #define OBD_CONNECT_LOV_V3      0x100000000ULL /*client supports LOV v3 EA */
+#define OBD_CONNECT_GRANT_SHRINK  0x200000000ULL /* support grant shrink */
 #define OBD_CONNECT_SKIP_ORPHAN 0x400000000ULL /* don't reuse orphan objids */
 /* also update obd_connect_names[] for lprocfs_rd_connect_flags()
  * and lustre/utils/wirecheck.c */
@@ -795,7 +796,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_CHANGE_QS | \
                                 OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_RMT_CLIENT | \
                                 OBD_CONNECT_RMT_CLIENT_FORCE | \
-                                OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN)
+                                OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
+                               OBD_CONNECT_GRANT_SHRINK)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
 
@@ -893,18 +895,11 @@ typedef __u32 obd_count;
 #define OBD_FL_NO_GRPQUOTA   (0x00000200) /* the object's group is over quota */
 #define OBD_FL_CREATE_CROW   (0x00000400) /* object should be create on write */
 
-/**
- * Set this to delegate DLM locking during obd_punch() to the OSTs. Only OSTs
- * that declared OBD_CONNECT_TRUNCLOCK in their connect flags support this
- * functionality.
- */
-#define OBD_FL_TRUNCLOCK     (0x00000800)
+#define OBD_FL_TRUNCLOCK     (0x00000800) /* delegate DLM locking during punch */
+#define OBD_FL_CKSUM_CRC32   (0x00001000) /* CRC32 checksum type */
+#define OBD_FL_CKSUM_ADLER   (0x00002000) /* ADLER checksum type */
+#define OBD_FL_SHRINK_GRANT  (0x00004000) /* object shrink the grant */
 
-/*
- * Checksum types
- */
-#define OBD_FL_CKSUM_CRC32    (0x00001000)
-#define OBD_FL_CKSUM_ADLER    (0x00002000)
 #define OBD_FL_CKSUM_ALL      (OBD_FL_CKSUM_CRC32 | OBD_FL_CKSUM_ADLER)
 
 #define LOV_MAGIC_V1      0x0BD10BD0
index 67efebc..f0babd3 100644 (file)
@@ -1281,8 +1281,17 @@ int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
 
 /* ptlrpc/pinger.c */
+enum timeout_event {
+        TIMEOUT_GRANT = 1 
+};
+struct timeout_item;
+typedef int (*timeout_cb_t)(struct timeout_item *, void *);
 int ptlrpc_pinger_add_import(struct obd_import *imp);
 int ptlrpc_pinger_del_import(struct obd_import *imp);
+int ptlrpc_add_timeout_client(int time, enum timeout_event event,
+                              timeout_cb_t cb, void *data,
+                              struct list_head *obd_list);
+int ptlrpc_del_timeout_client(struct list_head *obd_list);
 struct ptlrpc_request * ptlrpc_prep_ping(struct obd_import *imp);
 int ptlrpc_obd_ping(struct obd_device *obd);
 cfs_time_t ptlrpc_suspend_wakeup_time(void);
index bec9def..e08d367 100644 (file)
@@ -187,6 +187,7 @@ extern const struct req_format RQF_OST_DESTROY;
 extern const struct req_format RQF_OST_BRW;
 extern const struct req_format RQF_OST_STATFS;
 extern const struct req_format RQF_OST_SET_INFO;
+extern const struct req_format RQF_OST_SET_GRANT_INFO;
 extern const struct req_format RQF_OST_GET_INFO_GENERIC;
 extern const struct req_format RQF_OST_GET_INFO_LAST_ID;
 extern const struct req_format RQF_OST_GET_INFO_FIEMAP;
index b699209..2a7e40b 100644 (file)
@@ -315,6 +315,7 @@ struct filter_obd {
         obd_size             fo_tot_dirty;      /* protected by obd_osfs_lock */
         obd_size             fo_tot_granted;    /* all values in bytes */
         obd_size             fo_tot_pending;
+        int                  fo_tot_granted_clients;
 
         obd_size             fo_readcache_max_filesize;
         int                  fo_read_cache;
@@ -370,6 +371,14 @@ struct filter_obd {
         int                      fo_sec_level;
 };
 
+struct timeout_item {
+        enum timeout_event ti_event;
+        cfs_time_t         ti_timeout;
+        timeout_cb_t       ti_cb;
+        void              *ti_cb_data;
+        struct list_head   ti_obd_list;
+        struct list_head   ti_chain;
+};
 #define OSC_MAX_RIF_DEFAULT       8
 #define OSC_MAX_RIF_MAX         256
 #define OSC_MAX_DIRTY_DEFAULT  (OSC_MAX_RIF_DEFAULT * 4)
@@ -406,6 +415,9 @@ struct client_obd {
         long                     cl_avail_grant;   /* bytes of credit for ost */
         long                     cl_lost_grant;    /* lost credits (trunc) */
         struct list_head         cl_cache_waiters; /* waiting for cache/grant */
+        cfs_time_t               cl_next_shrink_grant;   /* jiffies */
+        struct list_head         cl_grant_shrink_list;  /* Timeout event list */
+        struct semaphore         cl_grant_sem;   /*grant shrink list semaphore*/
 
         /* keep track of objects that have lois that contain pages which
          * have been queued for async brw.  this lock also protects the
@@ -1120,6 +1132,7 @@ enum obd_cleanup_stage {
 /* XXX unused ?*/
 #define KEY_INTERMDS            "inter_mds"
 #define KEY_ASYNC               "async"
+#define KEY_GRANT_SHRINK        "grant_shrink"
 
 struct lu_context;
 
index 8ddb969..81a3209 100644 (file)
@@ -58,6 +58,7 @@ struct osc_brw_async_args {
         struct cl_req     *aa_clerq;
 };
 
+#define osc_grant_args osc_brw_async_args
 struct osc_async_args {
         struct obd_info   *aa_oi;
 };
index 5ef9fed..7096eaa 100644 (file)
@@ -125,6 +125,12 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #endif
 #define LONG_UNLINK 300          /* Unlink should happen before now */
 
+/**
+ * Time interval of shrink, if the client is "idle" more than this interval,
+ * then the ll_grant thread will return the requested grant space to filter
+ */
+#define GRANT_SHRINK_INTERVAL             360/*6 minutes*/
+
 
 #define OBD_FAIL_MDS                     0x100
 #define OBD_FAIL_MDS_HANDLE_UNPACK       0x101
index b407584..b4252fe 100644 (file)
@@ -339,7 +339,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT_CANCELSET | OBD_CONNECT_FID      |
                                   OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK|
                                   OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT |
-                                  OBD_CONNECT_OSS_CAPA;
+                                  OBD_CONNECT_OSS_CAPA | OBD_CONNECT_GRANT_SHRINK;
 
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
index 1a9af70..fdccecc 100644 (file)
@@ -2676,6 +2676,7 @@ static int filter_connect_internal(struct obd_export *exp,
                 RETURN(-EPROTO);
 
         if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
+                struct filter_obd *filter = &exp->exp_obd->u.filter;
                 obd_size left, want;
 
                 spin_lock(&exp->exp_obd->obd_osfs_lock);
@@ -2689,6 +2690,8 @@ static int filter_connect_internal(struct obd_export *exp,
                        LPU64" left: "LPU64"\n", exp->exp_obd->obd_name,
                        exp->exp_client_uuid.uuid, exp,
                        data->ocd_grant, want, left);
+                
+                filter->fo_tot_granted_clients ++;
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
@@ -2986,6 +2989,12 @@ static int filter_destroy_export(struct obd_export *exp)
         filter_grant_discard(exp);
         filter_fmd_cleanup(exp);
 
+        if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) {
+                struct filter_obd *filter = &exp->exp_obd->u.filter;
+                if (filter->fo_tot_granted_clients > 0)
+                        filter->fo_tot_granted_clients --;
+        }
+
         if (!(exp->exp_flags & OBD_OPT_FORCE))
                 filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
 
@@ -4344,6 +4353,15 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                 RETURN(0);
         }
 
+        if (KEY_IS(KEY_GRANT_SHRINK)) {
+                struct ost_body *body = (struct ost_body *)val;
+                /* handle shrink grant */
+                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                filter_grant_incoming(exp, &body->oa);
+                spin_unlock(&exp->exp_obd->obd_osfs_lock);
+                RETURN(rc);
+        }
+
         if (!KEY_IS(KEY_MDS_CONN))
                 RETURN(-EINVAL);
 
index 28578d6..9cb6de0 100644 (file)
@@ -59,6 +59,7 @@
                               OBD_INCOMPAT_COMMON_LR)
 
 #define FILTER_GRANT_CHUNK (2ULL * PTLRPC_MAX_BRW_SIZE)
+#define FILTER_GRANT_SHRINK_LIMIT (16ULL * FILTER_GRANT_CHUNK)
 #define GRANT_FOR_LLOG(obd) 16
 
 extern struct file_operations filter_per_export_stats_fops;
@@ -188,6 +189,7 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
                   obd_size want, obd_size fs_space_left);
 void filter_grant_commit(struct obd_export *exp, int niocount,
                          struct niobuf_local *res);
+void filter_grant_incoming(struct obd_export *exp, struct obdo *oa);
 struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *, int rw,
                                         int num_pages);
 void filter_free_iobuf(struct filter_iobuf *iobuf);
index 74d64ef..fa9a96f 100644 (file)
@@ -59,7 +59,7 @@ int *obdfilter_created_scratchpad;
 /* Grab the dirty and seen grant announcements from the incoming obdo.
  * We will later calculate the clients new grant and return it.
  * Caller must hold osfs lock */
-static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
+void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
 {
         struct filter_export_data *fed;
         struct obd_device *obd = exp->exp_obd;
@@ -108,6 +108,26 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
         obd->u.filter.fo_tot_granted -= oa->o_dropped;
         fed->fed_grant -= oa->o_dropped;
         fed->fed_dirty = oa->o_dirty;
+
+        if (oa->o_flags & OBD_FL_SHRINK_GRANT) {
+                obd_size left_space = filter_grant_space_left(exp);
+                struct filter_obd *filter = &exp->exp_obd->u.filter;
+
+                /*Only if left_space < fo_tot_clients * 32M, 
+                 *then the grant space could be shrinked */
+                if (left_space < filter->fo_tot_granted_clients * 
+                                 FILTER_GRANT_SHRINK_LIMIT) { 
+                        fed->fed_grant -= oa->o_grant;
+                        filter->fo_tot_granted -= oa->o_grant;
+                        CDEBUG(D_CACHE, "%s: cli %s/%p shrink "LPU64
+                               "fed_grant %ld total "LPU64"\n",
+                               obd->obd_name, exp->exp_client_uuid.uuid,
+                               exp, oa->o_grant, fed->fed_grant,
+                               filter->fo_tot_granted);
+                        oa->o_grant = 0;
+                }
+        }
+
         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
@@ -373,7 +393,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 spin_lock(&obd->obd_osfs_lock);
                 filter_grant_incoming(exp, oa);
 
-                oa->o_grant = 0;
+                if (!(oa->o_flags & OBD_FL_SHRINK_GRANT))
+                        oa->o_grant = 0;
                 spin_unlock(&obd->obd_osfs_lock);
         }
 
index 3cbe9c0..b8ee00a 100644 (file)
@@ -795,6 +795,15 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+
+}
+
+static void osc_update_next_shrink(struct client_obd *cli)
+{
+        int time = GRANT_SHRINK_INTERVAL;
+        cli->cl_next_shrink_grant = cfs_time_shift(time);
+        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
+               cli->cl_next_shrink_grant);
 }
 
 /* caller must hold loi_list_lock */
@@ -809,6 +818,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
                CFS_PAGE_SIZE, pga, pga->pg);
         LASSERT(cli->cl_avail_grant >= 0);
+        osc_update_next_shrink(cli);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -902,25 +912,143 @@ void osc_wake_cache_waiters(struct client_obd *cli)
         EXIT;
 }
 
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        cli->cl_avail_grant = ocd->ocd_grant;
+        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+        if (body->oa.o_valid & OBD_MD_FLGRANT)
+                cli->cl_avail_grant += body->oa.o_grant;
+        /* waiters are woken in brw_interpret */
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
 
-        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
-               cli->cl_avail_grant, cli->cl_lost_grant);
-        LASSERT(cli->cl_avail_grant >= 0);
+static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
+                              void *key, obd_count vallen, void *val,
+                              struct ptlrpc_request_set *set);
+
+static int osc_shrink_grant_interpret(const struct lu_env *env,
+                                     struct ptlrpc_request *req,
+                                      void *aa, int rc)
+{
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
+        struct ost_body *body;
+        
+        if (rc != 0) {
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                cli->cl_avail_grant += oa->o_grant;
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+                GOTO(out, rc);
+        }
+
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        LASSERT(body);
+        osc_update_grant(cli, body);
+out:
+        OBD_FREE_PTR(oa);
+        return rc;        
 }
 
-static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        if (body->oa.o_valid & OBD_MD_FLGRANT)
+        oa->o_grant = cli->cl_avail_grant / 4;
+        cli->cl_avail_grant -= oa->o_grant; 
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        oa->o_flags |= OBD_FL_SHRINK_GRANT;
+        osc_update_next_shrink(cli);
+}
+
+static int osc_shrink_grant(struct client_obd *cli)
+{
+        int    rc = 0;
+        struct ost_body     *body;
+        ENTRY;
+
+        OBD_ALLOC_PTR(body);
+        if (!body)
+                RETURN(-ENOMEM);
+
+        osc_announce_cached(cli, &body->oa, 0);
+        osc_shrink_grant_local(cli, &body->oa);
+        rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
+                                sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
+                                sizeof(*body), body, NULL);
+        if (rc) {
+                client_obd_list_lock(&cli->cl_loi_list_lock);
                 cli->cl_avail_grant += body->oa.o_grant;
-        /* waiters are woken in brw_interpret */
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+        }
+        if (body)
+               OBD_FREE_PTR(body);
+        RETURN(rc);
+}
+
+#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
+static int osc_should_shrink_grant(struct client_obd *client)
+{
+        cfs_time_t time = cfs_time_current();
+        cfs_time_t next_shrink = client->cl_next_shrink_grant;
+        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
+                    client->cl_avail_grant > GRANT_SHRINK_LIMIT)
+                        return 1;
+                else
+                        osc_update_next_shrink(client);
+        }
+        return 0;
+}
+
+static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+{
+        struct client_obd *client;
+
+        list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
+                if (osc_should_shrink_grant(client))
+                        osc_shrink_grant(client);
+        }
+        return 0;
+}
+
+static int osc_add_shrink_grant(struct client_obd *client)
+{
+        int rc;
+
+        rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
+                                         TIMEOUT_GRANT,
+                                         osc_grant_shrink_grant_cb, NULL,
+                                         &client->cl_grant_shrink_list);
+        if (rc) {
+                CERROR("add grant client %s error %d\n", 
+                        client->cl_import->imp_obd->obd_name, rc);
+                return rc;
+        }
+        CDEBUG(D_CACHE, "add grant client %s \n", 
+               client->cl_import->imp_obd->obd_name);
+        osc_update_next_shrink(client);
+        return 0; 
+}
+
+static int osc_del_shrink_grant(struct client_obd *client)
+{
+        CDEBUG(D_CACHE, "del grant client %s \n", 
+               client->cl_import->imp_obd->obd_name);
+        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
+}
+
+static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+{
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        cli->cl_avail_grant = ocd->ocd_grant;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+            list_empty(&cli->cl_grant_shrink_list))
+                osc_add_shrink_grant(cli);
+
+        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
+               cli->cl_avail_grant, cli->cl_lost_grant);
+        LASSERT(cli->cl_avail_grant >= 0);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -1172,6 +1300,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 (void *)(niobuf - niocount));
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+        if (osc_should_shrink_grant(cli))
+                osc_shrink_grant_local(cli, &body->oa); 
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
@@ -3615,7 +3745,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
-        if (!set)
+        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
                 RETURN(-EINVAL);
 
         /* We pass all other commands directly to OST. Since nobody calls osc
@@ -3625,9 +3755,12 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
            Even if something bad goes through, we'd get a -EINVAL from OST
            anyway. */
 
-
-        req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
-        if (req == NULL)
+       if (KEY_IS(KEY_GRANT_SHRINK))  
+                       req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
+       else 
+               req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
+        
+       if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
@@ -3652,13 +3785,31 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
-        }
-
-        ptlrpc_request_set_replen(req);
-        ptlrpc_set_add_req(set, req);
-        ptlrpc_check_set(NULL, set);
+        } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+                struct osc_grant_args *aa;
+                struct obdo *oa;
 
-        RETURN(0);
+                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                aa = ptlrpc_req_async_args(req);
+                OBD_ALLOC_PTR(oa);
+                if (!oa) {
+                        ptlrpc_req_finished(req);
+                        RETURN(-ENOMEM);
+                }
+                *oa = ((struct ost_body *)val)->oa;
+                aa->aa_oa = oa;
+               req->rq_interpret_reply = osc_shrink_grant_interpret;
+       }
+               
+       ptlrpc_request_set_replen(req);
+       if (!KEY_IS(KEY_GRANT_SHRINK)) {
+               LASSERT(set != NULL);
+               ptlrpc_set_add_req(set, req);
+               ptlrpc_check_set(NULL, set);
+       } else 
+               ptlrpcd_add_req(req, PSCOPE_OTHER);
+        
+       RETURN(0);
 }
 
 
@@ -3779,6 +3930,7 @@ static int osc_disconnect(struct obd_export *exp)
                        obd);
         }
 
+        osc_del_shrink_grant(&obd->u.cli);
         rc = client_disconnect_export(exp);
         return rc;
 }
@@ -3901,6 +4053,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+               
+                CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+                sema_init(&cli->cl_grant_sem, 1);
         }
 
         RETURN(rc);
index cb520bb..fa1713b 100644 (file)
@@ -1152,6 +1152,8 @@ out:
 
 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
 {
+        struct ost_body *body = NULL, *repbody;
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         char *key, *val = NULL;
         int keylen, vallen, rc = 0;
         ENTRY;
@@ -1163,13 +1165,33 @@ static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
         }
         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
 
-        rc = lustre_pack_reply(req, 1, NULL, NULL);
-        if (rc)
-                RETURN(rc);
+        if (KEY_IS(KEY_GRANT_SHRINK)) {
+                rc = lustre_pack_reply(req, 2, size, NULL);
+                if (rc)
+                        RETURN(rc); 
+        } else {
+                rc = lustre_pack_reply(req, 1, NULL, NULL);
+                if (rc)
+                        RETURN(rc);
+        }
 
         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
-        if (vallen)
-                val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
+        if (vallen) {
+                if (KEY_IS(KEY_GRANT_SHRINK)) { 
+                        body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, 
+                                                  sizeof(*body),
+                                                  lustre_swab_ost_body);
+                        if (!body)
+                                RETURN(-EFAULT);
+
+                        repbody = lustre_msg_buf(req->rq_repmsg, 
+                                                 REPLY_REC_OFF,
+                                                 sizeof(*repbody));
+                        memcpy(repbody, body, sizeof(*body));
+                        val = (char*)repbody;
+                } else 
+                        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
+        }
 
         if (KEY_IS(KEY_EVICT_BY_NID)) {
                 if (val && vallen)
index 86f44a2..66b0dc1 100644 (file)
@@ -574,6 +574,7 @@ static const struct req_format *req_formats[] = {
         &RQF_OST_BRW,
         &RQF_OST_STATFS,
         &RQF_OST_SET_INFO,
+        &RQF_OST_SET_GRANT_INFO,
         &RQF_OST_GET_INFO_GENERIC,
         &RQF_OST_GET_INFO_LAST_ID,
         &RQF_OST_GET_INFO_FIEMAP,
@@ -1202,6 +1203,11 @@ const struct req_format RQF_OST_SET_INFO =
         DEFINE_REQ_FMT0("OST_SET_INFO", ost_set_info_client, empty);
 EXPORT_SYMBOL(RQF_OST_SET_INFO);
 
+const struct req_format RQF_OST_SET_GRANT_INFO =
+        DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_set_info_client, 
+                        ost_body_only);
+EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
+
 const struct req_format RQF_OST_GET_INFO_GENERIC =
         DEFINE_REQ_FMT0("OST_GET_INFO", ost_get_info_generic_client,
                                         ost_get_info_generic_server);
index 7a15386..daec0f5 100644 (file)
@@ -50,7 +50,7 @@
 
 struct semaphore pinger_sem;
 static CFS_LIST_HEAD(pinger_imports);
-
+static struct list_head timeout_list = CFS_LIST_HEAD_INIT(timeout_list); 
 struct ptlrpc_request *
 ptlrpc_prep_ping(struct obd_import *imp)
 {
@@ -135,6 +135,25 @@ static inline int ptlrpc_next_reconnect(struct obd_import *imp)
 static atomic_t suspend_timeouts = ATOMIC_INIT(0);
 static cfs_time_t suspend_wakeup_time = 0;
 
+cfs_duration_t pinger_check_timeout(cfs_time_t time)
+{
+        struct timeout_item *item;
+        cfs_time_t timeout = PING_INTERVAL;
+
+       /* The timeout list is a increase order sorted list */
+        mutex_down(&pinger_sem);
+        list_for_each_entry(item, &timeout_list, ti_chain) {
+               int ti_timeout = item->ti_timeout;
+               if (timeout > ti_timeout)
+                        timeout = ti_timeout;
+               break;
+       }
+        mutex_up(&pinger_sem);
+        
+       return cfs_time_sub(cfs_time_add(time, cfs_time_seconds(timeout)),
+                                         cfs_time_current());
+}
+
 #ifdef __KERNEL__
 static wait_queue_head_t suspend_timeouts_waitq;
 #endif
@@ -250,10 +269,14 @@ static int ptlrpc_pinger_main(void *arg)
         while (1) {
                 cfs_time_t this_ping = cfs_time_current();
                 struct l_wait_info lwi;
-                cfs_duration_t time_to_next_ping;
+                cfs_duration_t time_to_next_wake;
+                struct timeout_item *item;
                 struct list_head *iter;
 
                 mutex_down(&pinger_sem);
+                list_for_each_entry(item, &timeout_list, ti_chain) {
+                        item->ti_cb(item, item->ti_cb_data);
+                }
                 list_for_each(iter, &pinger_imports) {
                         struct obd_import *imp =
                                 list_entry(iter, struct obd_import,
@@ -272,25 +295,19 @@ static int ptlrpc_pinger_main(void *arg)
                 obd_update_maxusage();
 
                 /* Wait until the next ping time, or until we're stopped. */
-                time_to_next_ping = cfs_time_sub(cfs_time_add(this_ping,
-                                               cfs_time_seconds(PING_INTERVAL)),
-                                               cfs_time_current());
-
+                time_to_next_wake = pinger_check_timeout(this_ping);
                 /* The ping sent by ptlrpc_send_rpc may get sent out
                    say .01 second after this.
                    ptlrpc_pinger_sending_on_import will then set the
                    next ping time to next_ping + .01 sec, which means
                    we will SKIP the next ping at next_ping, and the
                    ping will get sent 2 timeouts from now!  Beware. */
-                CDEBUG(D_INFO, "next ping in "CFS_DURATION_T" ("CFS_TIME_T")\n",
-                               time_to_next_ping,
-                               cfs_time_add(this_ping,
-                                            cfs_time_seconds(PING_INTERVAL)));
-                if (time_to_next_ping > 0) {
-                        lwi = LWI_TIMEOUT(max_t(cfs_duration_t,
-                                                time_to_next_ping,
-                                                cfs_time_seconds(1)),
-                                          NULL, NULL);
+                CDEBUG(D_INFO, "next wakeup in "CFS_DURATION_T" ("CFS_TIME_T")\n",
+                                time_to_next_wake,
+                                cfs_time_add(this_ping, cfs_time_seconds(PING_INTERVAL)));
+                if (time_to_next_wake > 0) {
+                        lwi = LWI_TIMEOUT(max_t(cfs_duration_t, time_to_next_wake, cfs_time_seconds(1)),
+                                            NULL, NULL);
                         l_wait_event(thread->t_ctl_waitq,
                                      thread->t_flags & (SVC_STOPPING|SVC_EVENT),
                                      &lwi);
@@ -351,6 +368,8 @@ int ptlrpc_start_pinger(void)
         RETURN(0);
 }
 
+int ptlrpc_pinger_remove_timeouts(void);
+
 int ptlrpc_stop_pinger(void)
 {
         struct l_wait_info lwi = { 0 };
@@ -362,6 +381,8 @@ int ptlrpc_stop_pinger(void)
 
         if (pinger_thread == NULL)
                 RETURN(-EALREADY);
+
+        ptlrpc_pinger_remove_timeouts();
         mutex_down(&pinger_sem);
         pinger_thread->t_flags = SVC_STOPPING;
         cfs_waitq_signal(&pinger_thread->t_ctl_waitq);
@@ -419,6 +440,105 @@ int ptlrpc_pinger_del_import(struct obd_import *imp)
         RETURN(0);
 }
 
+/**
+ * Register a timeout callback to the pinger list, and the callback will
+ * be called when timeout happens.
+ */
+struct timeout_item* ptlrpc_new_timeout(int time, enum timeout_event event,
+                                        timeout_cb_t cb, void *data)
+{
+        struct timeout_item *ti;
+        
+        OBD_ALLOC_PTR(ti);
+        if (!ti)
+                return(NULL);
+
+        CFS_INIT_LIST_HEAD(&ti->ti_obd_list);
+        CFS_INIT_LIST_HEAD(&ti->ti_chain);
+        ti->ti_timeout = time;
+        ti->ti_event = event;
+        ti->ti_cb = cb;
+        ti->ti_cb_data = data;
+        
+        return ti;
+}
+
+/**
+ * Register timeout event on the the pinger thread.
+ * Note: the timeout list is an sorted list with increased timeout value.
+ */
+static struct timeout_item*
+ptlrpc_pinger_register_timeout(int time, enum timeout_event event,
+                               timeout_cb_t cb, void *data)
+{
+        struct timeout_item *item;
+        struct timeout_item *ti = NULL;
+
+        LASSERT_SEM_LOCKED(&pinger_sem);
+        list_for_each_entry_reverse(item, &timeout_list, ti_chain) {
+                if (item->ti_event == event) {
+                        ti = item;
+                        break;
+                }
+                if (item->ti_timeout < ti->ti_timeout) {
+                        ti = ptlrpc_new_timeout(time, event, cb, data);
+                        if (!ti) {
+                                ti = ERR_PTR(-ENOMEM);
+                                break;
+                        }
+                        list_add(&ti->ti_chain, &item->ti_chain);
+                }
+        }
+        if (!ti) {
+                ti = ptlrpc_new_timeout(time, event, cb, data);
+                if (ti)
+                        list_add(&ti->ti_chain, &timeout_list);
+        }
+        
+        return ti;
+}
+/* Add a client_obd to the timeout event list, when timeout(@time) 
+ * happens, the callback(@cb) will be called.
+ */
+int ptlrpc_add_timeout_client(int time, enum timeout_event event,
+                              timeout_cb_t cb, void *data,
+                              struct list_head *obd_list)
+{
+        struct timeout_item *ti;
+
+        mutex_down(&pinger_sem);
+        ti = ptlrpc_pinger_register_timeout(time, event, cb, data);
+        if (!ti) {
+                mutex_up(&pinger_sem);
+                return (-EINVAL);
+        }
+        list_add(obd_list, &ti->ti_obd_list);
+        mutex_up(&pinger_sem);
+        return 0;
+}           
+
+int ptlrpc_del_timeout_client(struct list_head *obd_list)
+{
+        mutex_down(&pinger_sem);
+        list_del_init(obd_list);
+        mutex_up(&pinger_sem);
+        return 0;
+}  
+
+int ptlrpc_pinger_remove_timeouts(void)
+{
+        struct timeout_item *item, *tmp;
+
+        mutex_down(&pinger_sem);
+        list_for_each_entry_safe(item, tmp, &timeout_list, ti_chain) {
+                LASSERT(list_empty(&item->ti_obd_list));
+                list_del(&item->ti_chain);
+                OBD_FREE_PTR(item);
+        }
+        mutex_up(&pinger_sem);
+        return 0;
+}
+
 void ptlrpc_pinger_wake_up()
 {
 #ifdef ENABLE_PINGER
@@ -764,6 +884,18 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 #endif
 }
 
+int ptlrpc_add_timeout_client(int time, enum timeout_event event,
+                              timeout_cb_t cb, void *data,
+                              struct list_head *obd_list)
+{
+        return 0;
+}           
+
+int ptlrpc_del_timeout_client(struct list_head *obd_list)
+{
+        return 0;
+}  
+
 int ptlrpc_pinger_add_import(struct obd_import *imp)
 {
         ENTRY;
index c097f65..681873d 100644 (file)
@@ -325,6 +325,8 @@ EXPORT_SYMBOL(ptlrpc_recover_import);
 /* pinger.c */
 EXPORT_SYMBOL(ptlrpc_pinger_add_import);
 EXPORT_SYMBOL(ptlrpc_pinger_del_import);
+EXPORT_SYMBOL(ptlrpc_add_timeout_client);
+EXPORT_SYMBOL(ptlrpc_del_timeout_client);
 EXPORT_SYMBOL(ptlrpc_pinger_sending_on_import);
 
 /* ptlrpcd.c */