Whamcloud - gitweb
LU-80 mds: use md_size supplied by client, repack reply
[fs/lustre-release.git] / lustre / mdc / mdc_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDC
40 #ifndef __KERNEL__
41 # include <fcntl.h>
42 # include <liblustre.h>
43 #endif
44 #include <lustre_net.h>
45 #include <lustre/lustre_idl.h>
46 #include "mdc_internal.h"
47
48 #ifndef __KERNEL__
49 /* some liblustre hackings here */
50 #ifndef O_DIRECTORY
51 #define O_DIRECTORY     0
52 #endif
53 #endif
54
55 static void __mdc_pack_body(struct mdt_body *b, __u32 suppgid)
56 {
57         LASSERT (b != NULL);
58
59         b->suppgid = suppgid;
60         b->uid = cfs_curproc_uid();
61         b->gid = cfs_curproc_gid();
62         b->fsuid = cfs_curproc_fsuid();
63         b->fsgid = cfs_curproc_fsgid();
64         b->capability = cfs_curproc_cap_pack();
65 }
66
67 void mdc_pack_capa(struct ptlrpc_request *req, const struct req_msg_field *field,
68                    struct obd_capa *oc)
69 {
70         struct req_capsule *pill = &req->rq_pill;
71         struct lustre_capa *c;
72
73         if (oc == NULL) {
74                 LASSERT(req_capsule_get_size(pill, field, RCL_CLIENT) == 0);
75                 return;
76         }
77
78         c = req_capsule_client_get(pill, field);
79         LASSERT(c != NULL);
80         capa_cpy(c, oc);
81         DEBUG_CAPA(D_SEC, c, "pack");
82 }
83
84 void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
85                         const struct lu_fid *cfid, int flags)
86 {
87         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
88                                                     &RMF_MDT_BODY);
89
90         if (pfid) {
91                 b->fid1 = *pfid;
92                 b->valid = OBD_MD_FLID;
93         }
94         if (cfid)
95                 b->fid2 = *cfid;
96         b->flags = flags;
97 }
98
99 void mdc_pack_body(struct ptlrpc_request *req,
100                    const struct lu_fid *fid, struct obd_capa *oc,
101                    __u64 valid, int ea_size, __u32 suppgid, int flags)
102 {
103         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
104                                                     &RMF_MDT_BODY);
105         LASSERT(b != NULL);
106         b->valid = valid;
107         b->eadatasize = ea_size;
108         b->flags = flags;
109         __mdc_pack_body(b, suppgid);
110         if (fid) {
111                 b->fid1 = *fid;
112                 b->valid |= OBD_MD_FLID;
113                 mdc_pack_capa(req, &RMF_CAPA1, oc);
114         }
115 }
116
117 void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff,
118                       __u32 size, const struct lu_fid *fid, struct obd_capa *oc)
119 {
120         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
121                                                     &RMF_MDT_BODY);
122         b->fid1 = *fid;
123         b->valid |= OBD_MD_FLID;
124         b->size = pgoff;                       /* !! */
125         b->nlink = size;                        /* !! */
126         __mdc_pack_body(b, -1);
127         b->mode = LUDA_FID | LUDA_TYPE;
128
129         mdc_pack_capa(req, &RMF_CAPA1, oc);
130 }
131
132 /* packing of MDS records */
133 void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
134                      const void *data, int datalen, __u32 mode,
135                      __u32 uid, __u32 gid, cfs_cap_t cap_effective, __u64 rdev)
136 {
137         struct mdt_rec_create *rec;
138         char                  *tmp;
139
140         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
141         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
142
143
144         rec->cr_opcode   = REINT_CREATE;
145         rec->cr_fsuid    = uid;
146         rec->cr_fsgid    = gid;
147         rec->cr_cap      = cap_effective;
148         rec->cr_fid1     = op_data->op_fid1;
149         rec->cr_fid2     = op_data->op_fid2;
150         rec->cr_mode     = mode;
151         rec->cr_rdev     = rdev;
152         rec->cr_time     = op_data->op_mod_time;
153         rec->cr_suppgid1 = op_data->op_suppgids[0];
154         rec->cr_suppgid2 = op_data->op_suppgids[1];
155         set_mrc_cr_flags(rec, op_data->op_flags & MF_SOM_LOCAL_FLAGS);
156         rec->cr_bias     = op_data->op_bias;
157
158         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
159
160         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
161         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
162
163         if (data) {
164                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
165                 memcpy(tmp, data, datalen);
166         }
167 }
168
169 static __u64 mds_pack_open_flags(__u32 flags, __u32 mode)
170 {
171         __u64 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
172                                    MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS | 
173                                    MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK));
174         if (flags & O_CREAT)
175                 cr_flags |= MDS_OPEN_CREAT;
176         if (flags & O_EXCL)
177                 cr_flags |= MDS_OPEN_EXCL;
178         if (flags & O_TRUNC)
179                 cr_flags |= MDS_OPEN_TRUNC;
180         if (flags & O_APPEND)
181                 cr_flags |= MDS_OPEN_APPEND;
182         if (flags & O_SYNC)
183                 cr_flags |= MDS_OPEN_SYNC;
184         if (flags & O_DIRECTORY)
185                 cr_flags |= MDS_OPEN_DIRECTORY;
186 #ifdef FMODE_EXEC
187         if (flags & FMODE_EXEC)
188                 cr_flags |= MDS_FMODE_EXEC;
189 #endif
190         if (flags & O_LOV_DELAY_CREATE)
191                 cr_flags |= MDS_OPEN_DELAY_CREATE;
192
193         if ((flags & O_NOACCESS) || (flags & O_NONBLOCK))
194                 cr_flags |= MDS_OPEN_NORESTORE;
195
196         return cr_flags;
197 }
198
199 /* packing of MDS records */
200 void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
201                    __u32 mode, __u64 rdev, __u32 flags, const void *lmm,
202                    int lmmlen)
203 {
204         struct mdt_rec_create *rec;
205         char *tmp;
206         __u64 cr_flags;
207
208         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
209         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
210
211         /* XXX do something about time, uid, gid */
212         rec->cr_opcode   = REINT_OPEN;
213         rec->cr_fsuid   = cfs_curproc_fsuid();
214         rec->cr_fsgid   = cfs_curproc_fsgid();
215         rec->cr_cap      = cfs_curproc_cap_pack();
216         if (op_data != NULL) {
217                 rec->cr_fid1 = op_data->op_fid1;
218                 rec->cr_fid2 = op_data->op_fid2;
219         }
220         rec->cr_mode     = mode;
221         cr_flags = mds_pack_open_flags(flags, mode);
222         rec->cr_rdev     = rdev;
223         rec->cr_time     = op_data->op_mod_time;
224         rec->cr_suppgid1 = op_data->op_suppgids[0];
225         rec->cr_suppgid2 = op_data->op_suppgids[1];
226         rec->cr_bias     = op_data->op_bias;
227
228         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
229         /* the next buffer is child capa, which is used for replay,
230          * will be packed from the data in reply message. */
231
232         if (op_data->op_name) {
233                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
234                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
235         }
236
237         if (lmm) {
238                 cr_flags |= MDS_OPEN_HAS_EA;
239 #ifndef __KERNEL__
240                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
241                 rec->cr_fid2 = op_data->op_fid2;
242 #endif
243                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
244                 memcpy (tmp, lmm, lmmlen);
245         }
246         set_mrc_cr_flags(rec, cr_flags);
247 }
248
249 static inline __u64 attr_pack(unsigned int ia_valid) {
250         __u64 sa_valid = 0;
251
252         if (ia_valid & ATTR_MODE)
253                 sa_valid |= MDS_ATTR_MODE;
254         if (ia_valid & ATTR_UID)
255                 sa_valid |= MDS_ATTR_UID;
256         if (ia_valid & ATTR_GID)
257                 sa_valid |= MDS_ATTR_GID;
258         if (ia_valid & ATTR_SIZE)
259                 sa_valid |= MDS_ATTR_SIZE;
260         if (ia_valid & ATTR_ATIME)
261                 sa_valid |= MDS_ATTR_ATIME;
262         if (ia_valid & ATTR_MTIME)
263                 sa_valid |= MDS_ATTR_MTIME;
264         if (ia_valid & ATTR_CTIME)
265                 sa_valid |= MDS_ATTR_CTIME;
266         if (ia_valid & ATTR_ATIME_SET)
267                 sa_valid |= MDS_ATTR_ATIME_SET;
268         if (ia_valid & ATTR_MTIME_SET)
269                 sa_valid |= MDS_ATTR_MTIME_SET;
270         if (ia_valid & ATTR_FORCE)
271                 sa_valid |= MDS_ATTR_FORCE;
272         if (ia_valid & ATTR_ATTR_FLAG)
273                 sa_valid |= MDS_ATTR_ATTR_FLAG;
274         if (ia_valid & ATTR_KILL_SUID)
275                 sa_valid |=  MDS_ATTR_KILL_SUID;
276         if (ia_valid & ATTR_KILL_SGID)
277                 sa_valid |= MDS_ATTR_KILL_SGID;
278         if (ia_valid & ATTR_CTIME_SET)
279                 sa_valid |= MDS_ATTR_CTIME_SET;
280         if (ia_valid & ATTR_FROM_OPEN)
281                 sa_valid |= MDS_ATTR_FROM_OPEN;
282         if (ia_valid & ATTR_BLOCKS)
283                 sa_valid |= MDS_ATTR_BLOCKS;
284         if (ia_valid & MDS_OPEN_OWNEROVERRIDE)
285                 /* NFSD hack (see bug 5781) */
286                 sa_valid |= MDS_OPEN_OWNEROVERRIDE;
287         return sa_valid;
288 }
289
290 static void mdc_setattr_pack_rec(struct mdt_rec_setattr *rec,
291                                  struct md_op_data *op_data)
292 {
293         rec->sa_opcode  = REINT_SETATTR;
294         rec->sa_fsuid   = cfs_curproc_fsuid();
295         rec->sa_fsgid   = cfs_curproc_fsgid();
296         rec->sa_cap     = cfs_curproc_cap_pack();
297         rec->sa_suppgid = -1;
298
299         rec->sa_fid    = op_data->op_fid1;
300         rec->sa_valid  = attr_pack(op_data->op_attr.ia_valid);
301         rec->sa_mode   = op_data->op_attr.ia_mode;
302         rec->sa_uid    = op_data->op_attr.ia_uid;
303         rec->sa_gid    = op_data->op_attr.ia_gid;
304         rec->sa_size   = op_data->op_attr.ia_size;
305         rec->sa_blocks = op_data->op_attr_blocks;
306         rec->sa_atime  = LTIME_S(op_data->op_attr.ia_atime);
307         rec->sa_mtime  = LTIME_S(op_data->op_attr.ia_mtime);
308         rec->sa_ctime  = LTIME_S(op_data->op_attr.ia_ctime);
309         rec->sa_attr_flags = ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags;
310         if ((op_data->op_attr.ia_valid & ATTR_GID) &&
311             cfs_curproc_is_in_groups(op_data->op_attr.ia_gid))
312                 rec->sa_suppgid = op_data->op_attr.ia_gid;
313         else
314                 rec->sa_suppgid = op_data->op_suppgids[0];
315 }
316
317 static void mdc_ioepoch_pack(struct mdt_ioepoch *epoch,
318                              struct md_op_data *op_data)
319 {
320         memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle));
321         epoch->ioepoch = op_data->op_ioepoch;
322         epoch->flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
323 }
324
325 void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
326                       void *ea, int ealen, void *ea2, int ea2len)
327 {
328         struct mdt_rec_setattr *rec;
329         struct mdt_ioepoch *epoch;
330         struct lov_user_md *lum = NULL;
331
332         CLASSERT(sizeof(struct mdt_rec_reint) ==sizeof(struct mdt_rec_setattr));
333         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
334         mdc_setattr_pack_rec(rec, op_data);
335
336         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
337
338         if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) {
339                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
340                 mdc_ioepoch_pack(epoch, op_data);
341         }
342
343         if (ealen == 0)
344                 return;
345
346         lum = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
347         if (ea == NULL) { /* Remove LOV EA */
348                 lum->lmm_magic = LOV_USER_MAGIC_V1;
349                 lum->lmm_stripe_size = 0;
350                 lum->lmm_stripe_count = 0;
351                 lum->lmm_stripe_offset = (typeof(lum->lmm_stripe_offset))(-1);
352         } else {
353                 memcpy(lum, ea, ealen);
354         }
355
356         if (ea2len == 0)
357                 return;
358
359         memcpy(req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES), ea2,
360                ea2len);
361 }
362
363 void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
364 {
365         struct mdt_rec_unlink *rec;
366         char *tmp;
367  
368         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_unlink));
369         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
370         LASSERT (rec != NULL);
371
372         rec->ul_opcode  = REINT_UNLINK;
373         rec->ul_fsuid   = op_data->op_fsuid;
374         rec->ul_fsgid   = op_data->op_fsgid;
375         rec->ul_cap     = op_data->op_cap;
376         rec->ul_mode    = op_data->op_mode;
377         rec->ul_suppgid1= op_data->op_suppgids[0];
378         rec->ul_suppgid2= -1;
379         rec->ul_fid1    = op_data->op_fid1;
380         rec->ul_fid2    = op_data->op_fid2;
381         rec->ul_time    = op_data->op_mod_time;
382         rec->ul_bias    = op_data->op_bias;
383
384         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
385
386         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
387         LASSERT(tmp != NULL);
388         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
389 }
390
391 void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
392 {
393         struct mdt_rec_link *rec;
394         char *tmp;
395
396         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_link));
397         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
398         LASSERT (rec != NULL);
399
400         rec->lk_opcode   = REINT_LINK;
401         rec->lk_fsuid    = op_data->op_fsuid;//current->fsuid;
402         rec->lk_fsgid    = op_data->op_fsgid;//current->fsgid;
403         rec->lk_cap      = op_data->op_cap;//current->cap_effective;
404         rec->lk_suppgid1 = op_data->op_suppgids[0];
405         rec->lk_suppgid2 = op_data->op_suppgids[1];
406         rec->lk_fid1     = op_data->op_fid1;
407         rec->lk_fid2     = op_data->op_fid2;
408         rec->lk_time     = op_data->op_mod_time;
409         rec->lk_bias     = op_data->op_bias;
410
411         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
412         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
413
414         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
415         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
416 }
417
418 void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
419                      const char *old, int oldlen, const char *new, int newlen)
420 {
421         struct mdt_rec_rename *rec;
422         char *tmp;
423
424         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
425         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
426
427         /* XXX do something about time, uid, gid */
428         rec->rn_opcode   = REINT_RENAME;
429         rec->rn_fsuid    = op_data->op_fsuid;
430         rec->rn_fsgid    = op_data->op_fsgid;
431         rec->rn_cap      = op_data->op_cap;
432         rec->rn_suppgid1 = op_data->op_suppgids[0];
433         rec->rn_suppgid2 = op_data->op_suppgids[1];
434         rec->rn_fid1     = op_data->op_fid1;
435         rec->rn_fid2     = op_data->op_fid2;
436         rec->rn_time     = op_data->op_mod_time;
437         rec->rn_mode     = op_data->op_mode;
438         rec->rn_bias     = op_data->op_bias;
439
440         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
441         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
442
443         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
444         LOGL0(old, oldlen, tmp);
445
446         if (new) {
447                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SYMTGT);
448                 LOGL0(new, newlen, tmp);
449         }
450 }
451
452 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
453                       struct md_op_data *op_data, int ea_size)
454 {
455         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
456                                                     &RMF_MDT_BODY);
457
458         b->valid = valid;
459         if (op_data->op_bias & MDS_CHECK_SPLIT)
460                 b->valid |= OBD_MD_FLCKSPLIT;
461         if (op_data->op_bias & MDS_CROSS_REF)
462                 b->valid |= OBD_MD_FLCROSSREF;
463         b->eadatasize = ea_size;
464         b->flags = flags;
465         __mdc_pack_body(b, op_data->op_suppgids[0]);
466
467         b->fid1 = op_data->op_fid1;
468         b->fid2 = op_data->op_fid2;
469         b->valid |= OBD_MD_FLID;
470
471         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
472
473         if (op_data->op_name) {
474                 char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
475                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
476
477         }
478 }
479
480 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
481 {
482         struct mdt_ioepoch *epoch;
483         struct mdt_rec_setattr *rec;
484
485         epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
486         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
487
488         mdc_setattr_pack_rec(rec, op_data);
489         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
490         mdc_ioepoch_pack(epoch, op_data);
491 }
492
493 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
494 {
495         int rc;
496         ENTRY;
497         client_obd_list_lock(&cli->cl_loi_list_lock);
498         rc = cfs_list_empty(&mcw->mcw_entry);
499         client_obd_list_unlock(&cli->cl_loi_list_lock);
500         RETURN(rc);
501 };
502
503 /* We record requests in flight in cli->cl_r_in_flight here.
504  * There is only one write rpc possible in mdc anyway. If this to change
505  * in the future - the code may need to be revisited. */
506 int mdc_enter_request(struct client_obd *cli)
507 {
508         int rc = 0;
509         struct mdc_cache_waiter mcw;
510         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
511
512         client_obd_list_lock(&cli->cl_loi_list_lock);
513         if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
514                 cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
515                 cfs_waitq_init(&mcw.mcw_waitq);
516                 client_obd_list_unlock(&cli->cl_loi_list_lock);
517                 rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
518                 if (rc) {
519                         client_obd_list_lock(&cli->cl_loi_list_lock);
520                         if (cfs_list_empty(&mcw.mcw_entry))
521                                 cli->cl_r_in_flight--;
522                         cfs_list_del_init(&mcw.mcw_entry);
523                         client_obd_list_unlock(&cli->cl_loi_list_lock);
524                 }
525         } else {
526                 cli->cl_r_in_flight++;
527                 client_obd_list_unlock(&cli->cl_loi_list_lock);
528         }
529         return rc;
530 }
531
532 void mdc_exit_request(struct client_obd *cli)
533 {
534         cfs_list_t *l, *tmp;
535         struct mdc_cache_waiter *mcw;
536
537         client_obd_list_lock(&cli->cl_loi_list_lock);
538         cli->cl_r_in_flight--;
539         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
540                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
541                         /* No free request slots anymore */
542                         break;
543                 }
544
545                 mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
546                 cfs_list_del_init(&mcw->mcw_entry);
547                 cli->cl_r_in_flight++;
548                 cfs_waitq_signal(&mcw->mcw_waitq);
549         }
550         /* Empty waiting list? Decrease reqs in-flight number */
551
552         client_obd_list_unlock(&cli->cl_loi_list_lock);
553 }