Whamcloud - gitweb
LU-3531 llite: move dir cache to MDC layer
[fs/lustre-release.git] / lustre / mdc / mdc_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38 #ifndef __KERNEL__
39 # include <fcntl.h>
40 # include <liblustre.h>
41 #endif
42 #include <lustre_net.h>
43 #include <lustre/lustre_idl.h>
44 #include <obd.h>
45 #include <cl_object.h>
46 #include <lclient.h>
47 #include "mdc_internal.h"
48
49 #ifndef __KERNEL__
50 /* some liblustre hackings here */
51 #ifndef O_DIRECTORY
52 #define O_DIRECTORY     0
53 #endif
54 #endif
55
56 static void __mdc_pack_body(struct mdt_body *b, __u32 suppgid)
57 {
58         LASSERT (b != NULL);
59
60         b->suppgid = suppgid;
61         b->uid = current_uid();
62         b->gid = current_gid();
63         b->fsuid = current_fsuid();
64         b->fsgid = current_fsgid();
65         b->capability = cfs_curproc_cap_pack();
66 }
67
68 void mdc_pack_capa(struct ptlrpc_request *req, const struct req_msg_field *field,
69                    struct obd_capa *oc)
70 {
71         struct req_capsule *pill = &req->rq_pill;
72         struct lustre_capa *c;
73
74         if (oc == NULL) {
75                 LASSERT(req_capsule_get_size(pill, field, RCL_CLIENT) == 0);
76                 return;
77         }
78
79         c = req_capsule_client_get(pill, field);
80         LASSERT(c != NULL);
81         capa_cpy(c, oc);
82         DEBUG_CAPA(D_SEC, c, "pack");
83 }
84
85 void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
86                         const struct lu_fid *cfid, int flags)
87 {
88         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
89                                                     &RMF_MDT_BODY);
90
91         if (pfid) {
92                 b->fid1 = *pfid;
93                 b->valid = OBD_MD_FLID;
94         }
95         if (cfid)
96                 b->fid2 = *cfid;
97         b->flags = flags;
98 }
99
100 void mdc_swap_layouts_pack(struct ptlrpc_request *req,
101                            struct md_op_data *op_data)
102 {
103         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
104                                                     &RMF_MDT_BODY);
105
106         __mdc_pack_body(b, op_data->op_suppgids[0]);
107         b->fid1 = op_data->op_fid1;
108         b->fid2 = op_data->op_fid2;
109         b->valid |= OBD_MD_FLID;
110
111         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
112         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
113 }
114
115 void mdc_pack_body(struct ptlrpc_request *req,
116                    const struct lu_fid *fid, struct obd_capa *oc,
117                    __u64 valid, int ea_size, __u32 suppgid, int flags)
118 {
119         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
120                                                     &RMF_MDT_BODY);
121         LASSERT(b != NULL);
122         b->valid = valid;
123         b->eadatasize = ea_size;
124         b->flags = flags;
125         __mdc_pack_body(b, suppgid);
126         if (fid) {
127                 b->fid1 = *fid;
128                 b->valid |= OBD_MD_FLID;
129                 mdc_pack_capa(req, &RMF_CAPA1, oc);
130         }
131 }
132
133 void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff,
134                       __u32 size, const struct lu_fid *fid, struct obd_capa *oc)
135 {
136         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
137                                                     &RMF_MDT_BODY);
138         b->fid1 = *fid;
139         b->valid |= OBD_MD_FLID;
140         b->size = pgoff;                       /* !! */
141         b->nlink = size;                        /* !! */
142         __mdc_pack_body(b, -1);
143         b->mode = LUDA_FID | LUDA_TYPE;
144
145         mdc_pack_capa(req, &RMF_CAPA1, oc);
146 }
147
148 /* packing of MDS records */
149 void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
150                      const void *data, int datalen, __u32 mode,
151                      __u32 uid, __u32 gid, cfs_cap_t cap_effective, __u64 rdev)
152 {
153         struct mdt_rec_create   *rec;
154         char                    *tmp;
155         __u64                    flags;
156
157         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
158         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
159
160
161         rec->cr_opcode   = REINT_CREATE;
162         rec->cr_fsuid    = uid;
163         rec->cr_fsgid    = gid;
164         rec->cr_cap      = cap_effective;
165         rec->cr_fid1     = op_data->op_fid1;
166         rec->cr_fid2     = op_data->op_fid2;
167         rec->cr_mode     = mode;
168         rec->cr_rdev     = rdev;
169         rec->cr_time     = op_data->op_mod_time;
170         rec->cr_suppgid1 = op_data->op_suppgids[0];
171         rec->cr_suppgid2 = op_data->op_suppgids[1];
172         flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
173         if (op_data->op_bias & MDS_CREATE_VOLATILE)
174                 flags |= MDS_OPEN_VOLATILE;
175         set_mrc_cr_flags(rec, flags);
176         rec->cr_bias     = op_data->op_bias;
177         rec->cr_umask    = current_umask();
178
179         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
180
181         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
182         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
183
184         if (data) {
185                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
186                 memcpy(tmp, data, datalen);
187         }
188 }
189
190 static __u64 mds_pack_open_flags(__u64 flags, __u32 mode)
191 {
192         __u64 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
193                                    MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS |
194                                    MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK |
195                                    MDS_OPEN_BY_FID | MDS_OPEN_LEASE |
196                                    MDS_OPEN_RELEASE));
197         if (flags & O_CREAT)
198                 cr_flags |= MDS_OPEN_CREAT;
199         if (flags & O_EXCL)
200                 cr_flags |= MDS_OPEN_EXCL;
201         if (flags & O_TRUNC)
202                 cr_flags |= MDS_OPEN_TRUNC;
203         if (flags & O_APPEND)
204                 cr_flags |= MDS_OPEN_APPEND;
205         if (flags & O_SYNC)
206                 cr_flags |= MDS_OPEN_SYNC;
207         if (flags & O_DIRECTORY)
208                 cr_flags |= MDS_OPEN_DIRECTORY;
209 #ifdef FMODE_EXEC
210         if (flags & FMODE_EXEC)
211                 cr_flags |= MDS_FMODE_EXEC;
212 #endif
213         if (flags & O_LOV_DELAY_CREATE)
214                 cr_flags |= MDS_OPEN_DELAY_CREATE;
215
216         if ((flags & O_NOACCESS) || (flags & O_NONBLOCK))
217                 cr_flags |= MDS_OPEN_NORESTORE;
218
219         return cr_flags;
220 }
221
222 /* packing of MDS records */
223 void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
224                    __u32 mode, __u64 rdev, __u64 flags, const void *lmm,
225                    int lmmlen)
226 {
227         struct mdt_rec_create *rec;
228         char *tmp;
229         __u64 cr_flags;
230
231         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
232         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
233
234         /* XXX do something about time, uid, gid */
235         rec->cr_opcode = REINT_OPEN;
236         rec->cr_fsuid  = current_fsuid();
237         rec->cr_fsgid  = current_fsgid();
238         rec->cr_cap    = cfs_curproc_cap_pack();
239         rec->cr_mode   = mode;
240         cr_flags = mds_pack_open_flags(flags, mode);
241         rec->cr_rdev   = rdev;
242         rec->cr_umask  = current_umask();
243         if (op_data != NULL) {
244                 rec->cr_fid1       = op_data->op_fid1;
245                 rec->cr_fid2       = op_data->op_fid2;
246                 rec->cr_time       = op_data->op_mod_time;
247                 rec->cr_suppgid1   = op_data->op_suppgids[0];
248                 rec->cr_suppgid2   = op_data->op_suppgids[1];
249                 rec->cr_bias       = op_data->op_bias;
250                 rec->cr_old_handle = op_data->op_handle;
251
252                 mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
253                 /* the next buffer is child capa, which is used for replay,
254                  * will be packed from the data in reply message. */
255
256                 if (op_data->op_name) {
257                         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
258                         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
259                         if (op_data->op_bias & MDS_CREATE_VOLATILE)
260                                 cr_flags |= MDS_OPEN_VOLATILE;
261                 }
262 #ifndef __KERNEL__
263                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
264                 rec->cr_fid2 = op_data->op_fid2;
265 #endif
266         }
267
268         if (lmm) {
269                 cr_flags |= MDS_OPEN_HAS_EA;
270                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
271                 memcpy(tmp, lmm, lmmlen);
272         }
273         set_mrc_cr_flags(rec, cr_flags);
274 }
275
276 static inline __u64 attr_pack(unsigned int ia_valid) {
277         __u64 sa_valid = 0;
278
279         if (ia_valid & ATTR_MODE)
280                 sa_valid |= MDS_ATTR_MODE;
281         if (ia_valid & ATTR_UID)
282                 sa_valid |= MDS_ATTR_UID;
283         if (ia_valid & ATTR_GID)
284                 sa_valid |= MDS_ATTR_GID;
285         if (ia_valid & ATTR_SIZE)
286                 sa_valid |= MDS_ATTR_SIZE;
287         if (ia_valid & ATTR_ATIME)
288                 sa_valid |= MDS_ATTR_ATIME;
289         if (ia_valid & ATTR_MTIME)
290                 sa_valid |= MDS_ATTR_MTIME;
291         if (ia_valid & ATTR_CTIME)
292                 sa_valid |= MDS_ATTR_CTIME;
293         if (ia_valid & ATTR_ATIME_SET)
294                 sa_valid |= MDS_ATTR_ATIME_SET;
295         if (ia_valid & ATTR_MTIME_SET)
296                 sa_valid |= MDS_ATTR_MTIME_SET;
297         if (ia_valid & ATTR_FORCE)
298                 sa_valid |= MDS_ATTR_FORCE;
299         if (ia_valid & ATTR_ATTR_FLAG)
300                 sa_valid |= MDS_ATTR_ATTR_FLAG;
301         if (ia_valid & ATTR_KILL_SUID)
302                 sa_valid |=  MDS_ATTR_KILL_SUID;
303         if (ia_valid & ATTR_KILL_SGID)
304                 sa_valid |= MDS_ATTR_KILL_SGID;
305         if (ia_valid & ATTR_CTIME_SET)
306                 sa_valid |= MDS_ATTR_CTIME_SET;
307         if (ia_valid & ATTR_FROM_OPEN)
308                 sa_valid |= MDS_ATTR_FROM_OPEN;
309         if (ia_valid & ATTR_BLOCKS)
310                 sa_valid |= MDS_ATTR_BLOCKS;
311         if (ia_valid & MDS_OPEN_OWNEROVERRIDE)
312                 /* NFSD hack (see bug 5781) */
313                 sa_valid |= MDS_OPEN_OWNEROVERRIDE;
314         return sa_valid;
315 }
316
317 static void mdc_setattr_pack_rec(struct mdt_rec_setattr *rec,
318                                  struct md_op_data *op_data)
319 {
320         rec->sa_opcode  = REINT_SETATTR;
321         rec->sa_fsuid   = current_fsuid();
322         rec->sa_fsgid   = current_fsgid();
323         rec->sa_cap     = cfs_curproc_cap_pack();
324         rec->sa_suppgid = -1;
325
326         rec->sa_fid    = op_data->op_fid1;
327         rec->sa_valid  = attr_pack(op_data->op_attr.ia_valid);
328         rec->sa_mode   = op_data->op_attr.ia_mode;
329         rec->sa_uid    = op_data->op_attr.ia_uid;
330         rec->sa_gid    = op_data->op_attr.ia_gid;
331         rec->sa_size   = op_data->op_attr.ia_size;
332         rec->sa_blocks = op_data->op_attr_blocks;
333         rec->sa_atime  = LTIME_S(op_data->op_attr.ia_atime);
334         rec->sa_mtime  = LTIME_S(op_data->op_attr.ia_mtime);
335         rec->sa_ctime  = LTIME_S(op_data->op_attr.ia_ctime);
336         rec->sa_attr_flags = ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags;
337         if ((op_data->op_attr.ia_valid & ATTR_GID) &&
338             in_group_p(op_data->op_attr.ia_gid))
339                 rec->sa_suppgid = op_data->op_attr.ia_gid;
340         else
341                 rec->sa_suppgid = op_data->op_suppgids[0];
342
343         rec->sa_bias = op_data->op_bias;
344 }
345
346 static void mdc_ioepoch_pack(struct mdt_ioepoch *epoch,
347                              struct md_op_data *op_data)
348 {
349         memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle));
350         epoch->ioepoch = op_data->op_ioepoch;
351         epoch->flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
352 }
353
354 void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
355                       void *ea, int ealen, void *ea2, int ea2len)
356 {
357         struct mdt_rec_setattr *rec;
358         struct mdt_ioepoch *epoch;
359         struct lov_user_md *lum = NULL;
360
361         CLASSERT(sizeof(struct mdt_rec_reint) ==sizeof(struct mdt_rec_setattr));
362         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
363         mdc_setattr_pack_rec(rec, op_data);
364
365         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
366
367         if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) {
368                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
369                 mdc_ioepoch_pack(epoch, op_data);
370         }
371
372         if (ealen == 0)
373                 return;
374
375         lum = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
376         if (ea == NULL) { /* Remove LOV EA */
377                 lum->lmm_magic = LOV_USER_MAGIC_V1;
378                 lum->lmm_stripe_size = 0;
379                 lum->lmm_stripe_count = 0;
380                 lum->lmm_stripe_offset = (typeof(lum->lmm_stripe_offset))(-1);
381         } else {
382                 memcpy(lum, ea, ealen);
383         }
384
385         if (ea2len == 0)
386                 return;
387
388         memcpy(req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES), ea2,
389                ea2len);
390 }
391
392 void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
393 {
394         struct mdt_rec_unlink *rec;
395         char *tmp;
396
397         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_unlink));
398         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
399         LASSERT(rec != NULL);
400
401         rec->ul_opcode  = op_data->op_cli_flags & CLI_RM_ENTRY ?
402                                         REINT_RMENTRY : REINT_UNLINK;
403         rec->ul_fsuid   = op_data->op_fsuid;
404         rec->ul_fsgid   = op_data->op_fsgid;
405         rec->ul_cap     = op_data->op_cap;
406         rec->ul_mode    = op_data->op_mode;
407         rec->ul_suppgid1= op_data->op_suppgids[0];
408         rec->ul_suppgid2= -1;
409         rec->ul_fid1    = op_data->op_fid1;
410         rec->ul_fid2    = op_data->op_fid2;
411         rec->ul_time    = op_data->op_mod_time;
412         rec->ul_bias    = op_data->op_bias;
413
414         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
415
416         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
417         LASSERT(tmp != NULL);
418         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
419 }
420
421 void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
422 {
423         struct mdt_rec_link *rec;
424         char *tmp;
425
426         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_link));
427         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
428         LASSERT (rec != NULL);
429
430         rec->lk_opcode   = REINT_LINK;
431         rec->lk_fsuid    = op_data->op_fsuid;//current->fsuid;
432         rec->lk_fsgid    = op_data->op_fsgid;//current->fsgid;
433         rec->lk_cap      = op_data->op_cap;//current->cap_effective;
434         rec->lk_suppgid1 = op_data->op_suppgids[0];
435         rec->lk_suppgid2 = op_data->op_suppgids[1];
436         rec->lk_fid1     = op_data->op_fid1;
437         rec->lk_fid2     = op_data->op_fid2;
438         rec->lk_time     = op_data->op_mod_time;
439         rec->lk_bias     = op_data->op_bias;
440
441         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
442         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
443
444         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
445         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
446 }
447
448 void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
449                      const char *old, int oldlen, const char *new, int newlen)
450 {
451         struct mdt_rec_rename *rec;
452         char *tmp;
453
454         CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
455         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
456
457         /* XXX do something about time, uid, gid */
458         rec->rn_opcode   = REINT_RENAME;
459         rec->rn_fsuid    = op_data->op_fsuid;
460         rec->rn_fsgid    = op_data->op_fsgid;
461         rec->rn_cap      = op_data->op_cap;
462         rec->rn_suppgid1 = op_data->op_suppgids[0];
463         rec->rn_suppgid2 = op_data->op_suppgids[1];
464         rec->rn_fid1     = op_data->op_fid1;
465         rec->rn_fid2     = op_data->op_fid2;
466         rec->rn_time     = op_data->op_mod_time;
467         rec->rn_mode     = op_data->op_mode;
468         rec->rn_bias     = op_data->op_bias;
469
470         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
471         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
472
473         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
474         LOGL0(old, oldlen, tmp);
475
476         if (new) {
477                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SYMTGT);
478                 LOGL0(new, newlen, tmp);
479         }
480 }
481
482 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
483                       struct md_op_data *op_data, int ea_size)
484 {
485         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
486                                                     &RMF_MDT_BODY);
487
488         b->valid = valid;
489         if (op_data->op_bias & MDS_CHECK_SPLIT)
490                 b->valid |= OBD_MD_FLCKSPLIT;
491         if (op_data->op_bias & MDS_CROSS_REF)
492                 b->valid |= OBD_MD_FLCROSSREF;
493         b->eadatasize = ea_size;
494         b->flags = flags;
495         __mdc_pack_body(b, op_data->op_suppgids[0]);
496
497         b->fid1 = op_data->op_fid1;
498         b->fid2 = op_data->op_fid2;
499         b->valid |= OBD_MD_FLID;
500
501         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
502
503         if (op_data->op_name) {
504                 char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
505                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
506
507         }
508 }
509
510 static void mdc_hsm_release_pack(struct ptlrpc_request *req,
511                                  struct md_op_data *op_data)
512 {
513         if (op_data->op_bias & MDS_HSM_RELEASE) {
514                 struct close_data *data;
515                 struct ldlm_lock *lock;
516
517                 data = req_capsule_client_get(&req->rq_pill, &RMF_CLOSE_DATA);
518                 LASSERT(data != NULL);
519
520                 lock = ldlm_handle2lock(&op_data->op_lease_handle);
521                 if (lock != NULL) {
522                         data->cd_handle = lock->l_remote_handle;
523                         LDLM_LOCK_PUT(lock);
524                 }
525                 ldlm_cli_cancel(&op_data->op_lease_handle, LCF_LOCAL);
526
527                 data->cd_data_version = op_data->op_data_version;
528                 data->cd_fid = op_data->op_fid2;
529         }
530 }
531
532 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
533 {
534         struct mdt_ioepoch *epoch;
535         struct mdt_rec_setattr *rec;
536
537         epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
538         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
539
540         mdc_setattr_pack_rec(rec, op_data);
541         mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
542         mdc_ioepoch_pack(epoch, op_data);
543         mdc_hsm_release_pack(req, op_data);
544 }
545
546 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
547 {
548         int rc;
549         ENTRY;
550         client_obd_list_lock(&cli->cl_loi_list_lock);
551         rc = cfs_list_empty(&mcw->mcw_entry);
552         client_obd_list_unlock(&cli->cl_loi_list_lock);
553         RETURN(rc);
554 };
555
556 /* We record requests in flight in cli->cl_r_in_flight here.
557  * There is only one write rpc possible in mdc anyway. If this to change
558  * in the future - the code may need to be revisited. */
559 int mdc_enter_request(struct client_obd *cli)
560 {
561         int rc = 0;
562         struct mdc_cache_waiter mcw;
563         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
564
565         client_obd_list_lock(&cli->cl_loi_list_lock);
566         if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
567                 cfs_list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
568                 init_waitqueue_head(&mcw.mcw_waitq);
569                 client_obd_list_unlock(&cli->cl_loi_list_lock);
570                 rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
571                 if (rc) {
572                         client_obd_list_lock(&cli->cl_loi_list_lock);
573                         if (cfs_list_empty(&mcw.mcw_entry))
574                                 cli->cl_r_in_flight--;
575                         cfs_list_del_init(&mcw.mcw_entry);
576                         client_obd_list_unlock(&cli->cl_loi_list_lock);
577                 }
578         } else {
579                 cli->cl_r_in_flight++;
580                 client_obd_list_unlock(&cli->cl_loi_list_lock);
581         }
582         return rc;
583 }
584
585 void mdc_exit_request(struct client_obd *cli)
586 {
587         cfs_list_t *l, *tmp;
588         struct mdc_cache_waiter *mcw;
589
590         client_obd_list_lock(&cli->cl_loi_list_lock);
591         cli->cl_r_in_flight--;
592         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
593                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
594                         /* No free request slots anymore */
595                         break;
596                 }
597
598                 mcw = cfs_list_entry(l, struct mdc_cache_waiter, mcw_entry);
599                 cfs_list_del_init(&mcw->mcw_entry);
600                 cli->cl_r_in_flight++;
601                 wake_up(&mcw->mcw_waitq);
602         }
603         /* Empty waiting list? Decrease reqs in-flight number */
604
605         client_obd_list_unlock(&cli->cl_loi_list_lock);
606 }