Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / mdc / mdc_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38 #ifndef __KERNEL__
39 # include <fcntl.h>
40 # include <liblustre.h>
41 #endif
42 #include <lustre/lustre_idl.h>
43 #include <lustre_net.h>
44 #include "mdc_internal.h"
45
46 #ifndef __KERNEL__
47 /* some liblustre hackings here */
48 #ifndef O_DIRECTORY
49 #define O_DIRECTORY     0
50 #endif
51 #endif
52
53 static void mdc_readdir_pack_18(struct ptlrpc_request *req, int offset,
54                                 __u64 pg_off, __u32 size, struct ll_fid *fid)
55 {
56         struct mds_body *b;
57         ENTRY;
58
59         CLASSERT(sizeof(struct ll_fid)   == sizeof(struct lu_fid));
60         CLASSERT(sizeof(struct mds_body) <= sizeof(struct mdt_body));
61         CLASSERT((int)offsetof(struct mds_body, max_cookiesize) == 
62                  (int)offsetof(struct mdt_body, max_cookiesize));
63
64
65         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
66         b->fsuid = current->fsuid;
67         b->fsgid = current->fsgid;
68         b->capability = cfs_curproc_cap_pack();
69         b->fid1 = *fid;
70         b->size = pg_off;                       /* !! */
71         b->suppgid = -1;
72         b->nlink = size;                        /* !! */
73         EXIT;
74 }
75
76 static void mdc_readdir_pack_20(struct ptlrpc_request *req, int offset,
77                                 __u64 pg_off, __u32 size, struct ll_fid *fid)
78 {
79         struct mdt_body *b;
80         ENTRY;
81
82         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
83         b->fsuid = current->fsuid;
84         b->fsgid = current->fsgid;
85         b->capability = cfs_curproc_cap_pack();
86
87         if (fid) {
88                 b->fid1 = *((struct lu_fid*)fid);
89                 b->valid |= OBD_MD_FLID;
90         }
91         b->size = pg_off;                       /* !! */
92         b->suppgid = -1;
93         b->nlink = size;                        /* !! */
94         EXIT;
95 }
96
97 void mdc_readdir_pack(struct ptlrpc_request *req, int offset,
98                       __u64 pg_off, __u32 size, struct ll_fid *fid)
99 {
100         if (mdc_req_is_2_0_server(req))
101                 mdc_readdir_pack_20(req, offset, pg_off, size, fid);
102         else
103                 mdc_readdir_pack_18(req, offset, pg_off, size, fid);
104 }
105
106 static void mdc_pack_req_body_18(struct ptlrpc_request *req, int offset,
107                                  __u64 valid, struct ll_fid *fid, int ea_size,
108                                  int flags)
109 {
110         struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
111         ENTRY;
112         LASSERT (b != NULL);
113
114         if (fid)
115                 b->fid1 = *fid;
116         b->valid = valid;
117         b->eadatasize = ea_size;
118         b->flags = flags;
119         b->fsuid = current->fsuid;
120         b->fsgid = current->fsgid;
121         b->capability = cfs_curproc_cap_pack();
122         EXIT;
123 }
124
125 static void mdc_pack_req_body_20(struct ptlrpc_request *req, int offset,
126                                  __u64 valid, struct ll_fid *fid, int ea_size,
127                                  int flags)
128 {
129         struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
130         ENTRY;
131         LASSERT (b != NULL);
132
133         b->valid      = valid;
134         b->eadatasize = ea_size;
135         b->flags      = flags;
136         if (fid) {
137                 b->fid1 = *((struct lu_fid*)fid);
138                 b->valid |= OBD_MD_FLID;
139         }
140
141         b->fsuid = current->fsuid;
142         b->fsgid = current->fsgid;
143         b->capability = cfs_curproc_cap_pack();
144         EXIT;
145 }
146
147 void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
148                        __u64 valid, struct ll_fid *fid, int ea_size,
149                        int flags)
150 {
151         if (mdc_req_is_2_0_server(req))
152                 mdc_pack_req_body_20(req, offset, valid, fid, ea_size, flags);
153         else
154                 mdc_pack_req_body_18(req, offset, valid, fid, ea_size, flags);
155 }
156
157 /* packing of MDS records */
158 static void mdc_create_pack_18(struct ptlrpc_request *req, int offset,
159                                struct mdc_op_data *op_data, const void *data,
160                                int datalen, __u32 mode, __u32 uid, __u32 gid,
161                                cfs_cap_t cap_effective, __u64 rdev)
162 {
163         struct mds_rec_create *rec;
164         char *tmp;
165         ENTRY;
166
167         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
168
169         rec->cr_opcode  = REINT_CREATE;
170         rec->cr_fsuid   = uid;
171         rec->cr_fsgid   = gid;
172         rec->cr_cap     = cap_effective;
173         rec->cr_fid     = op_data->fid1;
174         memset(&rec->cr_replayfid, 0, sizeof(rec->cr_replayfid));
175         rec->cr_mode    = mode;
176         rec->cr_rdev    = rdev;
177         rec->cr_time    = op_data->mod_time;
178         rec->cr_suppgid = op_data->suppgids[0];
179
180         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, op_data->namelen + 1);
181         LOGL0(op_data->name, op_data->namelen, tmp);
182
183         if (data) {
184                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, datalen);
185                 memcpy (tmp, data, datalen);
186         }
187         EXIT;
188 }
189
190 static void mdc_create_pack_20(struct ptlrpc_request *req, int offset,
191                                struct mdc_op_data *op_data, const void *data,
192                                int datalen, __u32 mode, __u32 uid, __u32 gid,
193                                cfs_cap_t cap_effective, __u64 rdev)
194 {
195         struct mdt_rec_create *rec;
196         char *tmp;
197         ENTRY;
198
199         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
200
201         rec->cr_opcode   = REINT_CREATE;
202         rec->cr_fsuid    = uid;
203         rec->cr_fsgid    = gid;
204         rec->cr_cap      = cap_effective;
205         memcpy(&rec->cr_fid1, &op_data->fid1, sizeof(op_data->fid1));
206         memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
207         rec->cr_mode     = mode;
208         rec->cr_rdev     = rdev;
209         rec->cr_time     = op_data->mod_time;
210         rec->cr_suppgid1 = op_data->suppgids[0];
211
212         /* offset + 1  == capa */
213         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, op_data->namelen + 1);
214         LOGL0(op_data->name, op_data->namelen, tmp);
215
216         if (data) {
217                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, datalen);
218                 memcpy(tmp, data, datalen);
219         }
220         EXIT;
221 }
222
223 void mdc_create_pack(struct ptlrpc_request *req, int offset,
224                      struct mdc_op_data *op_data, const void *data,
225                      int datalen, __u32 mode, __u32 uid, __u32 gid,
226                      cfs_cap_t cap_effective, __u64 rdev)
227 {
228         if (mdc_req_is_2_0_server(req))
229                 mdc_create_pack_20(req, offset, op_data, data, datalen,
230                                    mode, uid, gid, cap_effective, rdev);
231         else
232                 mdc_create_pack_18(req, offset, op_data, data, datalen,
233                                    mode, uid, gid, cap_effective, rdev);
234 }
235
236 static __u32 mds_pack_open_flags(__u32 flags)
237 {
238         __u32 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
239                                    MDS_OPEN_DELAY_CREATE | MDS_OPEN_HAS_EA |
240                                    MDS_OPEN_HAS_OBJS | MDS_OPEN_OWNEROVERRIDE |
241                                    MDS_OPEN_LOCK));
242         if (flags & O_CREAT)
243                 cr_flags |= MDS_OPEN_CREAT;
244         if (flags & O_EXCL)
245                 cr_flags |= MDS_OPEN_EXCL;
246         if (flags & O_TRUNC)
247                 cr_flags |= MDS_OPEN_TRUNC;
248         if (flags & O_APPEND)
249                 cr_flags |= MDS_OPEN_APPEND;
250         if (flags & O_SYNC)
251                 cr_flags |= MDS_OPEN_SYNC;
252         if (flags & O_DIRECTORY)
253                 cr_flags |= MDS_OPEN_DIRECTORY;
254         if (flags & O_JOIN_FILE)
255                 cr_flags |= MDS_OPEN_JOIN_FILE;
256 #ifdef FMODE_EXEC
257         if (flags & FMODE_EXEC)
258                 cr_flags |= MDS_FMODE_EXEC;
259 #endif
260         return cr_flags;
261 }
262
263 /* packing of MDS records */
264 static void mdc_join_pack_18(struct ptlrpc_request *req, int offset,
265                              struct mdc_op_data *op_data, __u64 head_size)
266 {
267         struct mds_rec_join *rec;
268         ENTRY;
269
270         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
271         LASSERT(rec != NULL);
272         rec->jr_fid = op_data->fid2;
273         rec->jr_headsize = head_size;
274         EXIT;
275 }
276
277 static void mdc_join_pack_20(struct ptlrpc_request *req, int offset,
278                              struct mdc_op_data *op_data, __u64 head_size)
279 {
280         struct mdt_rec_join *rec;
281         ENTRY;
282
283         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
284         LASSERT(rec != NULL);
285         memcpy(&rec->jr_fid, &op_data->fid2, sizeof(op_data->fid2));
286         rec->jr_headsize = head_size;
287         EXIT;
288 }
289
290 void mdc_join_pack(struct ptlrpc_request *req, int offset,
291                    struct mdc_op_data *op_data, __u64 head_size)
292 {
293         if (mdc_req_is_2_0_server(req))
294                 mdc_join_pack_20(req, offset, op_data, head_size);
295         else
296                 mdc_join_pack_18(req, offset, op_data, head_size);
297 }
298
299 static void mdc_open_pack_18(struct ptlrpc_request *req, int offset,
300                             struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
301                              __u32 flags, const void *lmm, int lmmlen)
302 {
303         struct mds_rec_create *rec;
304         char *tmp;
305         ENTRY;
306
307         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
308
309         /* XXX do something about time, uid, gid */
310         rec->cr_opcode  = REINT_OPEN;
311         rec->cr_fsuid   = current->fsuid;
312         rec->cr_fsgid   = current->fsgid;
313         rec->cr_cap     = cfs_curproc_cap_pack();
314         rec->cr_fid     = op_data->fid1;
315         memset(&rec->cr_replayfid, 0, sizeof(rec->cr_replayfid));
316         rec->cr_mode    = mode;
317         rec->cr_flags   = mds_pack_open_flags(flags);
318         rec->cr_rdev    = rdev;
319         rec->cr_time    = op_data->mod_time;
320         rec->cr_suppgid = op_data->suppgids[0];
321
322         if (op_data->name) {
323                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
324                                      op_data->namelen + 1);
325                 LOGL0(op_data->name, op_data->namelen, tmp);
326         }
327
328         if (lmm) {
329                 rec->cr_flags |= MDS_OPEN_HAS_EA;
330 #ifndef __KERNEL__
331                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
332                 rec->cr_replayfid = op_data->fid2;
333 #endif
334                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, lmmlen);
335                 memcpy (tmp, lmm, lmmlen);
336         }
337         EXIT;
338 }
339
340 static void mdc_open_pack_20(struct ptlrpc_request *req, int offset,
341                             struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
342                              __u32 flags, const void *lmm, int lmmlen)
343 {
344         struct mdt_rec_create *rec;
345         char *tmp;
346         ENTRY;
347
348         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
349
350         /* XXX do something about time, uid, gid */
351         rec->cr_opcode = REINT_OPEN;
352         rec->cr_fsuid  = current->fsuid;
353         rec->cr_fsgid  = current->fsgid;
354         rec->cr_cap    = cfs_curproc_cap_pack();
355         memcpy(&rec->cr_fid1, &op_data->fid1, sizeof(op_data->fid1));
356         memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
357         rec->cr_mode   = mode;
358         rec->cr_flags  = mds_pack_open_flags(flags);
359         rec->cr_rdev   = rdev;
360         rec->cr_time   = op_data->mod_time;
361         rec->cr_suppgid1 = op_data->suppgids[0];
362         rec->cr_suppgid2 = op_data->suppgids[1];
363
364         if (op_data->name) {
365                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3,
366                                      op_data->namelen + 1);
367                 CDEBUG(D_INFO, "offset=%d, src=%p(%d):%s, dst=%p\n",
368                         offset, op_data->name, op_data->namelen,
369                         op_data->name, tmp);
370                 LASSERT(tmp);
371                 LOGL0(op_data->name, op_data->namelen, tmp);
372         }
373
374         if (lmm) {
375                 rec->cr_flags |= MDS_OPEN_HAS_EA;
376 #ifndef __KERNEL__
377                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
378                 memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
379 #endif
380                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, lmmlen);
381                 memcpy(tmp, lmm, lmmlen);
382         }
383         EXIT;
384 }
385
386 void mdc_open_pack(struct ptlrpc_request *req, int offset,
387                    struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
388                    __u32 flags, const void *lmm, int lmmlen)
389 {
390         if (mdc_req_is_2_0_server(req))
391                 mdc_open_pack_20(req, offset, op_data, mode, rdev,
392                                  flags, lmm, lmmlen);
393         else
394                 mdc_open_pack_18(req, offset, op_data, mode, rdev,
395                                  flags, lmm, lmmlen);
396
397 }
398
399 static inline __u64 attr_pack(unsigned int ia_valid) {
400         __u64 sa_valid = 0;
401
402         if (ia_valid & ATTR_MODE)
403                 sa_valid |= MDS_ATTR_MODE;
404         if (ia_valid & ATTR_UID)
405                 sa_valid |= MDS_ATTR_UID;
406         if (ia_valid & ATTR_GID)
407                 sa_valid |= MDS_ATTR_GID;
408         if (ia_valid & ATTR_SIZE)
409                 sa_valid |= MDS_ATTR_SIZE;
410         if (ia_valid & ATTR_ATIME)
411                 sa_valid |= MDS_ATTR_ATIME;
412         if (ia_valid & ATTR_MTIME)
413                 sa_valid |= MDS_ATTR_MTIME;
414         if (ia_valid & ATTR_CTIME)
415                 sa_valid |= MDS_ATTR_CTIME;
416         if (ia_valid & ATTR_ATIME_SET)
417                 sa_valid |= MDS_ATTR_ATIME_SET;
418         if (ia_valid & ATTR_MTIME_SET)
419                 sa_valid |= MDS_ATTR_MTIME_SET;
420         if (ia_valid & ATTR_FORCE)
421                 sa_valid |= MDS_ATTR_FORCE;
422         if (ia_valid & ATTR_ATTR_FLAG)
423                 sa_valid |= MDS_ATTR_ATTR_FLAG;
424         if (ia_valid & ATTR_KILL_SUID)
425                 sa_valid |=  MDS_ATTR_KILL_SUID;
426         if (ia_valid & ATTR_KILL_SGID)
427                 sa_valid |= MDS_ATTR_KILL_SGID;
428         if (ia_valid & ATTR_CTIME_SET)
429                 sa_valid |= MDS_ATTR_CTIME_SET;
430         if (ia_valid & ATTR_FROM_OPEN)
431                 sa_valid |= MDS_ATTR_FROM_OPEN;
432         if (ia_valid & MDS_OPEN_OWNEROVERRIDE)
433                 /* NFSD hack (see bug 5781) */
434                 sa_valid |= MDS_OPEN_OWNEROVERRIDE;
435         return sa_valid;
436 }
437
438 void mdc_setattr_pack_18(struct ptlrpc_request *req, int offset,
439                          struct mdc_op_data *data, struct iattr *iattr, void *ea,
440                          int ealen, void *ea2, int ea2len)
441 {
442         struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
443                                                      sizeof(*rec));
444         ENTRY;
445
446         rec->sa_opcode = REINT_SETATTR;
447         rec->sa_fsuid = current->fsuid;
448         rec->sa_fsgid = current->fsgid;
449         rec->sa_cap = cfs_curproc_cap_pack();
450         rec->sa_fid = data->fid1;
451         rec->sa_suppgid = -1;
452
453         if (iattr) {
454                 rec->sa_valid = attr_pack(iattr->ia_valid);
455                 rec->sa_mode = iattr->ia_mode;
456                 rec->sa_uid = iattr->ia_uid;
457                 rec->sa_gid = iattr->ia_gid;
458                 rec->sa_size = iattr->ia_size;
459                 rec->sa_atime = LTIME_S(iattr->ia_atime);
460                 rec->sa_mtime = LTIME_S(iattr->ia_mtime);
461                 rec->sa_ctime = LTIME_S(iattr->ia_ctime);
462                 rec->sa_attr_flags =
463                                ((struct ll_iattr_struct *)iattr)->ia_attr_flags;
464                 if ((iattr->ia_valid & ATTR_GID) && in_group_p(iattr->ia_gid))
465                         rec->sa_suppgid = iattr->ia_gid;
466                 else
467                         rec->sa_suppgid = data->suppgids[0];
468         }
469
470         if (ealen == 0) {
471                 EXIT;
472                 return;
473         }
474
475         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 1, ealen), ea, ealen);
476
477         if (ea2len == 0) {
478                 EXIT;
479                 return;
480         }
481         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ea2len), ea2, ea2len);
482
483         EXIT;
484 }
485
486 static void mdc_setattr_pack_20(struct ptlrpc_request *req, int offset,
487                                 struct mdc_op_data *data, struct iattr *iattr,
488                                 void *ea, int ealen, void *ea2, int ea2len)
489 {
490         struct mdt_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
491                                                      sizeof(*rec));
492         ENTRY;
493
494         rec->sa_opcode  = REINT_SETATTR;
495         rec->sa_fsuid   = current->fsuid;
496         rec->sa_fsgid   = current->fsgid;
497         rec->sa_cap     = cfs_curproc_cap_pack();
498         memcpy(&rec->sa_fid, &data->fid1, sizeof(data->fid1));
499         rec->sa_suppgid = -1;
500
501         if (iattr) {
502                 rec->sa_valid   = attr_pack(iattr->ia_valid);
503                 rec->sa_mode    = iattr->ia_mode;
504                 rec->sa_uid     = iattr->ia_uid;
505                 rec->sa_gid     = iattr->ia_gid;
506                 rec->sa_size    = iattr->ia_size;
507 //              rec->sa_blocks  = iattr->ia_blocks;
508                 rec->sa_atime   = LTIME_S(iattr->ia_atime);
509                 rec->sa_mtime   = LTIME_S(iattr->ia_mtime);
510                 rec->sa_ctime   = LTIME_S(iattr->ia_ctime);
511                 rec->sa_attr_flags = 
512                         ((struct ll_iattr_struct *)iattr)->ia_attr_flags;
513                 if ((iattr->ia_valid & ATTR_GID) && in_group_p(iattr->ia_gid))
514                         rec->sa_suppgid = iattr->ia_gid;
515                 else
516                         rec->sa_suppgid = data->suppgids[0];
517         }
518         if (ealen == 0) {
519                 EXIT;
520                 return;
521         }
522         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 3, ealen), ea, ealen);
523
524         if (ea2len == 0) {
525                 EXIT;
526                 return;
527         }
528         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 4, ea2len), ea2, ea2len);
529         EXIT;
530 }
531
532 void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
533                       struct mdc_op_data *data, struct iattr *iattr,
534                       void *ea, int ealen, void *ea2, int ea2len)
535 {
536         if (mdc_req_is_2_0_server(req))
537                 mdc_setattr_pack_20(req, offset, data, iattr,
538                                     ea, ealen, ea2, ea2len);
539         else
540                 mdc_setattr_pack_18(req, offset, data, iattr,
541                                     ea, ealen, ea2, ea2len);
542 }
543
544 static void mdc_unlink_pack_18(struct ptlrpc_request *req, int offset,
545                                struct mdc_op_data *data)
546 {
547         struct mds_rec_unlink *rec;
548         char *tmp;
549         ENTRY;
550
551         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
552         LASSERT (rec != NULL);
553
554         rec->ul_opcode = REINT_UNLINK;
555         rec->ul_fsuid = current->fsuid;
556         rec->ul_fsgid = current->fsgid;
557         rec->ul_cap = cfs_curproc_cap_pack();
558         rec->ul_mode = data->create_mode;
559         rec->ul_suppgid = data->suppgids[0];
560         rec->ul_fid1 = data->fid1;
561         rec->ul_fid2 = data->fid2;
562         rec->ul_time = data->mod_time;
563
564         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, data->namelen + 1);
565         LASSERT (tmp != NULL);
566         LOGL0(data->name, data->namelen, tmp);
567         EXIT;
568 }
569
570 static void mdc_unlink_pack_20(struct ptlrpc_request *req, int offset,
571                                struct mdc_op_data *data)
572 {
573         struct mdt_rec_unlink *rec;
574         char *tmp;
575         ENTRY;
576
577         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
578         LASSERT (rec != NULL);
579
580         rec->ul_opcode  = REINT_UNLINK;
581         rec->ul_fsuid   = current->fsuid;
582         rec->ul_fsgid   = current->fsgid;
583         rec->ul_cap     = cfs_curproc_cap_pack();
584         rec->ul_mode    = data->create_mode;
585         rec->ul_suppgid1= data->suppgids[0];
586         memcpy(&rec->ul_fid1, &data->fid1, sizeof(data->fid1));
587         memcpy(&rec->ul_fid2, &data->fid2, sizeof(data->fid2));
588         rec->ul_time    = data->mod_time;
589
590         /* NULL capa is skipped. */
591
592         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, data->namelen + 1);
593         LASSERT (tmp != NULL);
594         LOGL0(data->name, data->namelen, tmp);
595         EXIT;
596 }
597
598 void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
599                      struct mdc_op_data *data)
600 {
601         if (mdc_req_is_2_0_server(req))
602                 mdc_unlink_pack_20(req, offset, data);
603         else
604                 mdc_unlink_pack_18(req, offset, data);
605 }
606 static void mdc_link_pack_18(struct ptlrpc_request *req, int offset,
607                              struct mdc_op_data *data)
608 {
609         struct mds_rec_link *rec;
610         char *tmp;
611         ENTRY;
612
613         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
614
615         rec->lk_opcode = REINT_LINK;
616         rec->lk_fsuid = current->fsuid;
617         rec->lk_fsgid = current->fsgid;
618         rec->lk_cap = cfs_curproc_cap_pack();
619         rec->lk_suppgid1 = data->suppgids[0];
620         rec->lk_suppgid2 = data->suppgids[1];
621         rec->lk_fid1 = data->fid1;
622         rec->lk_fid2 = data->fid2;
623         rec->lk_time = data->mod_time;
624
625         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, data->namelen + 1);
626         LOGL0(data->name, data->namelen, tmp);
627         EXIT;
628 }
629
630 static void mdc_link_pack_20(struct ptlrpc_request *req, int offset,
631                              struct mdc_op_data *data)
632 {
633         struct mdt_rec_link *rec;
634         char *tmp;
635         ENTRY;
636
637         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
638
639         rec->lk_opcode   = REINT_LINK;
640         rec->lk_fsuid    = current->fsuid;
641         rec->lk_fsgid    = current->fsgid;
642         rec->lk_cap      = cfs_curproc_cap_pack();
643         rec->lk_suppgid1 = data->suppgids[0];
644         rec->lk_suppgid2 = data->suppgids[1];
645         memcpy(&rec->lk_fid1, &data->fid1, sizeof(data->fid1));
646         memcpy(&rec->lk_fid2, &data->fid2, sizeof(data->fid2));
647         rec->lk_time     = data->mod_time;
648
649
650         /* capa @ offset + 1; */
651         /* capa @ offset + 2; */
652
653         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, data->namelen + 1);
654         LOGL0(data->name, data->namelen, tmp);
655         EXIT;
656 }
657
658 void mdc_link_pack(struct ptlrpc_request *req, int offset,
659                    struct mdc_op_data *data)
660 {
661         if (mdc_req_is_2_0_server(req))
662                 mdc_link_pack_20(req, offset, data);
663         else
664                 mdc_link_pack_18(req, offset, data);
665 }
666
667 static void mdc_rename_pack_18(struct ptlrpc_request *req, int offset,
668                                struct mdc_op_data *data, const char *old, 
669                                int oldlen, const char *new, int newlen)
670 {
671         struct mds_rec_rename *rec;
672         char *tmp;
673         ENTRY;
674
675         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
676
677         /* XXX do something about time, uid, gid */
678         rec->rn_opcode = REINT_RENAME;
679         rec->rn_fsuid = current->fsuid;
680         rec->rn_fsgid = current->fsgid;
681         rec->rn_cap = cfs_curproc_cap_pack();
682         rec->rn_suppgid1 = data->suppgids[0];
683         rec->rn_suppgid2 = data->suppgids[1];
684         rec->rn_fid1 = data->fid1;
685         rec->rn_fid2 = data->fid2;
686         rec->rn_time = data->mod_time;
687
688         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, oldlen + 1);
689         LOGL0(old, oldlen, tmp);
690
691         if (new) {
692                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, newlen + 1);
693                 LOGL0(new, newlen, tmp);
694         }
695         EXIT;
696 }
697
698 static void mdc_rename_pack_20(struct ptlrpc_request *req, int offset,
699                                struct mdc_op_data *data, const char *old,
700                                int oldlen, const char *new, int newlen)
701 {
702         struct mdt_rec_rename *rec;
703         char *tmp;
704         ENTRY;
705
706         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
707
708         /* XXX do something about time, uid, gid */
709         rec->rn_opcode   = REINT_RENAME;
710         rec->rn_fsuid    = current->fsuid;
711         rec->rn_fsgid    = current->fsgid;
712         rec->rn_cap      = cfs_curproc_cap_pack();
713         rec->rn_suppgid1 = data->suppgids[0];
714         rec->rn_suppgid2 = data->suppgids[1];
715         memcpy(&rec->rn_fid1, &data->fid1, sizeof(data->fid1));
716         memcpy(&rec->rn_fid2, &data->fid2, sizeof(data->fid2));
717         rec->rn_time     = data->mod_time;
718         rec->rn_mode     = data->create_mode;
719
720
721         /* skip capa @ offset + 1 */
722         /* skip capa @ offset + 2 */
723
724         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, oldlen + 1);
725         LOGL0(old, oldlen, tmp);
726
727         if (new) {
728                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, newlen + 1);
729                 LOGL0(new, newlen, tmp);
730         }
731         EXIT;
732 }
733
734 void mdc_rename_pack(struct ptlrpc_request *req, int offset,
735                      struct mdc_op_data *data, const char *old,
736                      int oldlen, const char *new, int newlen)
737 {
738         if (mdc_req_is_2_0_server(req))
739                 mdc_rename_pack_20(req, offset, data, old, oldlen, new, newlen);
740         else
741                 mdc_rename_pack_18(req, offset, data, old, oldlen, new, newlen);
742 }
743
744 static void mdc_getattr_pack_18(struct ptlrpc_request *req, int offset,
745                                 __u64 valid, int flags, struct mdc_op_data *data)
746 {
747         struct mds_body *b;
748         ENTRY;
749
750         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
751
752         b->fsuid = current->fsuid;
753         b->fsgid = current->fsgid;
754         b->capability = cfs_curproc_cap_pack();
755         b->valid = valid;
756         b->flags = flags | MDS_BFLAG_EXT_FLAGS;
757         /* skip MDS_BFLAG_EXT_FLAGS to verify the "client < 1.4.7" case 
758          * refer to bug 12848.
759          */
760         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_OLD_EXT_FLAGS))
761                 b->flags &= ~MDS_BFLAG_EXT_FLAGS;
762         b->suppgid = data->suppgids[0];
763
764         b->fid1 = data->fid1;
765         b->fid2 = data->fid2;
766         if (data->name) {
767                 char *tmp;
768                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
769                                      data->namelen + 1);
770                 memcpy(tmp, data->name, data->namelen);
771                 data->name = tmp;
772         }
773         EXIT;
774 }
775
776 static void mdc_getattr_pack_20(struct ptlrpc_request *req, int offset,
777                                 __u64 valid, int flags, struct mdc_op_data *data)
778 {
779         struct mdt_body *b;
780         ENTRY;
781
782         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*b));
783
784         b->fsuid = current->fsuid;
785         b->fsgid = current->fsgid;
786         b->capability = cfs_curproc_cap_pack();
787         b->valid = valid;
788         b->flags = flags | MDS_BFLAG_EXT_FLAGS;
789         b->suppgid = data->suppgids[0];
790
791         memcpy(&b->fid1, &data->fid1, sizeof(data->fid1));
792         memcpy(&b->fid2, &data->fid2, sizeof(data->fid2));
793         b->valid |= OBD_MD_FLID;
794         if (data->name) {
795                 char *tmp;
796                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2,
797                                      data->namelen + 1);
798                 LASSERT(tmp);
799                 LOGL0(data->name, data->namelen, tmp);
800         }
801         EXIT;
802 }
803
804 void mdc_getattr_pack(struct ptlrpc_request *req, int offset,
805                       __u64 valid, int flags, struct mdc_op_data *data)
806 {
807         if (mdc_req_is_2_0_server(req))
808                 mdc_getattr_pack_20(req, offset, valid, flags, data);
809         else
810                 mdc_getattr_pack_18(req, offset, valid, flags, data);
811 }
812 static void mdc_close_pack_18(struct ptlrpc_request *req, int offset,
813                               struct mdc_op_data *data,
814                               struct obdo *oa, __u64 valid,
815                               struct obd_client_handle *och)
816 {
817         struct mds_body *body;
818         ENTRY;
819
820         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
821
822         body->fid1 = data->fid1;
823         memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
824         if (oa->o_valid & OBD_MD_FLATIME) {
825                 body->atime = oa->o_atime;
826                 body->valid |= OBD_MD_FLATIME;
827         }
828         if (oa->o_valid & OBD_MD_FLMTIME) {
829                 body->mtime = oa->o_mtime;
830                 body->valid |= OBD_MD_FLMTIME;
831         }
832         if (oa->o_valid & OBD_MD_FLCTIME) {
833                 body->ctime = oa->o_ctime;
834                 body->valid |= OBD_MD_FLCTIME;
835         }
836         if (oa->o_valid & OBD_MD_FLSIZE) {
837                 body->size = oa->o_size;
838                 body->valid |= OBD_MD_FLSIZE;
839         }
840         if (oa->o_valid & OBD_MD_FLBLOCKS) {
841                 body->blocks = oa->o_blocks;
842                 body->valid |= OBD_MD_FLBLOCKS;
843         }
844         if (oa->o_valid & OBD_MD_FLFLAGS) {
845                 body->flags = oa->o_flags;
846                 body->valid |= OBD_MD_FLFLAGS;
847         }
848         EXIT;
849 }
850
851 static void mdc_close_pack_20(struct ptlrpc_request *req, int offset,
852                               struct mdc_op_data *data,
853                               struct obdo *oa, __u64 valid,
854                               struct obd_client_handle *och)
855 {
856         struct mdt_epoch *epoch;
857         struct mdt_rec_setattr *rec;
858         ENTRY;
859
860         epoch = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*epoch));
861         rec = lustre_msg_buf(req->rq_reqmsg, offset + 1, sizeof(*rec));
862
863         rec->sa_opcode  = REINT_SETATTR;
864         rec->sa_fsuid   = current->fsuid;
865         rec->sa_fsgid   = current->fsgid;
866         rec->sa_cap     = cfs_curproc_cap_pack();
867         rec->sa_suppgid = -1;
868
869         memcpy(&rec->sa_fid, &data->fid1, sizeof(data->fid1));
870
871         if (oa->o_valid & OBD_MD_FLATIME) {
872                 rec->sa_atime = oa->o_atime;
873                 rec->sa_valid |= MDS_ATTR_ATIME;
874         }
875         if (oa->o_valid & OBD_MD_FLMTIME) {
876                 rec->sa_mtime = oa->o_mtime;
877                 rec->sa_valid |= MDS_ATTR_MTIME;
878         }
879         if (oa->o_valid & OBD_MD_FLCTIME) {
880                 rec->sa_ctime = oa->o_ctime;
881                 rec->sa_valid |= MDS_ATTR_CTIME;
882         }
883         if (oa->o_valid & OBD_MD_FLSIZE) {
884                 rec->sa_size = oa->o_size;
885                 rec->sa_valid |= MDS_ATTR_SIZE;
886         }
887         if (oa->o_valid & OBD_MD_FLBLOCKS) {
888                 rec->sa_blocks = oa->o_blocks;
889                 rec->sa_valid |= MDS_ATTR_BLOCKS;
890         }
891         if (oa->o_valid & OBD_MD_FLFLAGS) {
892                 rec->sa_attr_flags = oa->o_flags;
893                 rec->sa_valid |= MDS_ATTR_ATTR_FLAG;
894         }
895
896         epoch->handle = och->och_fh;
897         epoch->ioepoch = 0;
898         epoch->flags = 0;
899
900         EXIT;
901 }
902
903
904 void mdc_close_pack(struct ptlrpc_request *req, int offset,
905                     struct mdc_op_data *data,
906                     struct obdo *oa, __u64 valid,
907                     struct obd_client_handle *och)
908 {
909         if (mdc_req_is_2_0_server(req))
910                 mdc_close_pack_20(req, offset, data, oa, valid, och);
911         else
912                 mdc_close_pack_18(req, offset, data, oa, valid, och);
913 }
914 struct mdc_cache_waiter {
915         struct list_head        mcw_entry;
916         wait_queue_head_t       mcw_waitq;
917 };
918
919 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
920 {
921         int rc;
922         ENTRY;
923         spin_lock(&cli->cl_loi_list_lock);
924         rc = list_empty(&mcw->mcw_entry);
925         spin_unlock(&cli->cl_loi_list_lock);
926         RETURN(rc);
927 };
928
929 /* We record requests in flight in cli->cl_r_in_flight here.
930  * There is only one write rpc possible in mdc anyway. If this to change
931  * in the future - the code may need to be revisited. */
932 void mdc_enter_request(struct client_obd *cli)
933 {
934         struct mdc_cache_waiter mcw;
935         struct l_wait_info lwi = { 0 };
936
937         spin_lock(&cli->cl_loi_list_lock);
938         if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
939                 list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
940                 init_waitqueue_head(&mcw.mcw_waitq);
941                 spin_unlock(&cli->cl_loi_list_lock);
942                 l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw), &lwi);
943         } else {
944                 cli->cl_r_in_flight++;
945                 spin_unlock(&cli->cl_loi_list_lock);
946         }
947 }
948
949 void mdc_exit_request(struct client_obd *cli)
950 {
951         struct list_head *l, *tmp;
952         struct mdc_cache_waiter *mcw;
953
954         spin_lock(&cli->cl_loi_list_lock);
955         cli->cl_r_in_flight--;
956
957         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
958                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
959                         /* No free request slots anymore */
960                         break;
961                 }
962
963                 mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
964                 list_del_init(&mcw->mcw_entry);
965                 cli->cl_r_in_flight++;
966                 wake_up(&mcw->mcw_waitq);
967         }
968         /* Empty waiting list? Decrease reqs in-flight number */
969
970         spin_unlock(&cli->cl_loi_list_lock);
971 }