Whamcloud - gitweb
LU-286 racer: general protection fault.
[fs/lustre-release.git] / lustre / mdc / mdc_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38 #ifndef __KERNEL__
39 # include <fcntl.h>
40 # include <liblustre.h>
41 #endif
42 #include <lustre_net.h>
43 #include <lustre/lustre_idl.h>
44 #include "mdc_internal.h"
45
46 #ifndef __KERNEL__
47 /* some liblustre hackings here */
48 #ifndef O_DIRECTORY
49 #define O_DIRECTORY     0
50 #endif
51 #endif
52
53 static void mdc_readdir_pack_18(struct ptlrpc_request *req, int offset,
54                                 __u64 pg_off, __u32 size, struct ll_fid *fid)
55 {
56         struct mds_body *b;
57         ENTRY;
58
59         CLASSERT(sizeof(struct ll_fid)   == sizeof(struct lu_fid));
60         CLASSERT(sizeof(struct mds_body) <= sizeof(struct mdt_body));
61         CLASSERT((int)offsetof(struct mds_body, max_cookiesize) == 
62                  (int)offsetof(struct mdt_body, max_cookiesize));
63
64
65         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
66         b->fsuid = cfs_curproc_fsuid();
67         b->fsgid = cfs_curproc_fsgid();
68         b->capability = cfs_curproc_cap_pack();
69         b->fid1 = *fid;
70         b->size = pg_off;                       /* !! */
71         b->suppgid = -1;
72         b->nlink = size;                        /* !! */
73         EXIT;
74 }
75
76 static void mdc_readdir_pack_20(struct ptlrpc_request *req, int offset,
77                                 __u64 pg_off, __u32 size, struct ll_fid *fid)
78 {
79         struct mdt_body *b;
80         ENTRY;
81
82         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
83         b->fsuid = cfs_curproc_fsuid();
84         b->fsgid = cfs_curproc_fsgid();
85         b->capability = cfs_curproc_cap_pack();
86
87         if (fid) {
88                 b->fid1 = *((struct lu_fid*)fid);
89                 b->valid |= OBD_MD_FLID;
90         }
91         b->size = pg_off;                       /* !! */
92         b->suppgid = -1;
93         b->nlink = size;                        /* !! */
94         b->mode = LUDA_FID | LUDA_TYPE;
95         EXIT;
96 }
97
98 void mdc_readdir_pack(struct ptlrpc_request *req, int offset,
99                       __u64 pg_off, __u32 size, struct ll_fid *fid)
100 {
101         if (mdc_req_is_2_0_server(req))
102                 mdc_readdir_pack_20(req, offset, pg_off, size, fid);
103         else
104                 mdc_readdir_pack_18(req, offset, pg_off, size, fid);
105 }
106
107 static void mdc_pack_req_body_18(struct ptlrpc_request *req, int offset,
108                                  __u64 valid, struct ll_fid *fid, int ea_size,
109                                  int flags)
110 {
111         struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
112         ENTRY;
113         LASSERT (b != NULL);
114
115         if (fid)
116                 b->fid1 = *fid;
117         b->valid = valid;
118         b->eadatasize = ea_size;
119         b->flags = flags;
120         b->fsuid = cfs_curproc_fsuid();
121         b->fsgid = cfs_curproc_fsgid();
122         b->capability = cfs_curproc_cap_pack();
123         EXIT;
124 }
125
126 static void mdc_pack_req_body_20(struct ptlrpc_request *req, int offset,
127                                  __u64 valid, struct ll_fid *fid, int ea_size,
128                                  int flags)
129 {
130         struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
131         ENTRY;
132         LASSERT (b != NULL);
133
134         b->valid      = valid;
135         b->eadatasize = ea_size;
136         b->flags      = flags;
137         if (fid) {
138                 b->fid1 = *((struct lu_fid*)fid);
139                 b->valid |= OBD_MD_FLID;
140         }
141
142         b->fsuid = cfs_curproc_fsuid();
143         b->fsgid = cfs_curproc_fsgid();
144         b->capability = cfs_curproc_cap_pack();
145         EXIT;
146 }
147
148 void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
149                        __u64 valid, struct ll_fid *fid, int ea_size,
150                        int flags)
151 {
152         if (mdc_req_is_2_0_server(req))
153                 mdc_pack_req_body_20(req, offset, valid, fid, ea_size, flags);
154         else
155                 mdc_pack_req_body_18(req, offset, valid, fid, ea_size, flags);
156 }
157
158 /* packing of MDS records */
159 static void mdc_create_pack_18(struct ptlrpc_request *req, int offset,
160                                struct mdc_op_data *op_data, const void *data,
161                                int datalen, __u32 mode, __u32 uid, __u32 gid,
162                                cfs_cap_t cap_effective, __u64 rdev)
163 {
164         struct mds_rec_create *rec;
165         char *tmp;
166         ENTRY;
167
168         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
169
170         rec->cr_opcode  = REINT_CREATE;
171         rec->cr_fsuid   = uid;
172         rec->cr_fsgid   = gid;
173         rec->cr_cap     = cap_effective;
174         rec->cr_fid     = op_data->fid1;
175         memset(&rec->cr_replayfid, 0, sizeof(rec->cr_replayfid));
176         rec->cr_mode    = mode;
177         rec->cr_rdev    = rdev;
178         rec->cr_time    = op_data->mod_time;
179         rec->cr_suppgid = op_data->suppgids[0];
180
181         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, op_data->namelen + 1);
182         LOGL0(op_data->name, op_data->namelen, tmp);
183
184         if (data) {
185                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, datalen);
186                 memcpy (tmp, data, datalen);
187         }
188         EXIT;
189 }
190
191 static void mdc_create_pack_20(struct ptlrpc_request *req, int offset,
192                                struct mdc_op_data *op_data, const void *data,
193                                int datalen, __u32 mode, __u32 uid, __u32 gid,
194                                cfs_cap_t cap_effective, __u64 rdev)
195 {
196         struct mdt_rec_create *rec;
197         char *tmp;
198         ENTRY;
199
200         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
201
202         rec->cr_opcode   = REINT_CREATE;
203         rec->cr_fsuid    = uid;
204         rec->cr_fsgid    = gid;
205         rec->cr_cap      = cap_effective;
206         memcpy(&rec->cr_fid1, &op_data->fid1, sizeof(op_data->fid1));
207         memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
208         rec->cr_mode     = mode;
209         rec->cr_rdev     = rdev;
210         rec->cr_time     = op_data->mod_time;
211         rec->cr_suppgid1 = op_data->suppgids[0];
212
213         /* offset + 1  == capa */
214         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, op_data->namelen + 1);
215         LOGL0(op_data->name, op_data->namelen, tmp);
216
217         if (data) {
218                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, datalen);
219                 memcpy(tmp, data, datalen);
220         }
221         EXIT;
222 }
223
224 void mdc_create_pack(struct ptlrpc_request *req, int offset,
225                      struct mdc_op_data *op_data, const void *data,
226                      int datalen, __u32 mode, __u32 uid, __u32 gid,
227                      cfs_cap_t cap_effective, __u64 rdev)
228 {
229         if (mdc_req_is_2_0_server(req))
230                 mdc_create_pack_20(req, offset, op_data, data, datalen,
231                                    mode, uid, gid, cap_effective, rdev);
232         else
233                 mdc_create_pack_18(req, offset, op_data, data, datalen,
234                                    mode, uid, gid, cap_effective, rdev);
235 }
236
237 static __u32 mds_pack_open_flags(__u32 flags, __u32 mode)
238 {
239         __u32 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
240                                    MDS_OPEN_DELAY_CREATE | MDS_OPEN_HAS_OBJS |
241                                    MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK));
242         if (flags & O_CREAT)
243                 cr_flags |= MDS_OPEN_CREAT;
244         if (flags & O_EXCL)
245                 cr_flags |= MDS_OPEN_EXCL;
246         if (flags & O_TRUNC)
247                 cr_flags |= MDS_OPEN_TRUNC;
248         if (flags & O_APPEND)
249                 cr_flags |= MDS_OPEN_APPEND;
250         if (flags & O_SYNC)
251                 cr_flags |= MDS_OPEN_SYNC;
252         if (flags & O_DIRECTORY)
253                 cr_flags |= MDS_OPEN_DIRECTORY;
254         if (mode  & M_JOIN_FILE)
255                 cr_flags |= MDS_OPEN_JOIN_FILE;
256 #ifdef FMODE_EXEC
257         if (flags & FMODE_EXEC)
258                 cr_flags |= MDS_FMODE_EXEC;
259 #endif
260         return cr_flags;
261 }
262
263 /* packing of MDS records */
264 static void mdc_join_pack_18(struct ptlrpc_request *req, int offset,
265                              struct mdc_op_data *op_data, __u64 head_size)
266 {
267         struct mds_rec_join *rec;
268         ENTRY;
269
270         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
271         LASSERT(rec != NULL);
272         rec->jr_fid = op_data->fid2;
273         rec->jr_headsize = head_size;
274         EXIT;
275 }
276
277 static void mdc_join_pack_20(struct ptlrpc_request *req, int offset,
278                              struct mdc_op_data *op_data, __u64 head_size)
279 {
280         struct mdt_rec_join *rec;
281         ENTRY;
282
283         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
284         LASSERT(rec != NULL);
285         memcpy(&rec->jr_fid, &op_data->fid2, sizeof(op_data->fid2));
286         rec->jr_headsize = head_size;
287         EXIT;
288 }
289
290 void mdc_join_pack(struct ptlrpc_request *req, int offset,
291                    struct mdc_op_data *op_data, __u64 head_size)
292 {
293         if (mdc_req_is_2_0_server(req))
294                 mdc_join_pack_20(req, offset, op_data, head_size);
295         else
296                 mdc_join_pack_18(req, offset, op_data, head_size);
297 }
298
299 static void mdc_open_pack_18(struct ptlrpc_request *req, int offset,
300                             struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
301                              __u32 flags, const void *lmm, int lmmlen)
302 {
303         struct mds_rec_create *rec;
304         char *tmp;
305         ENTRY;
306
307         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
308
309         /* XXX do something about time, uid, gid */
310         rec->cr_opcode  = REINT_OPEN;
311         rec->cr_fsuid   = cfs_curproc_fsuid();
312         rec->cr_fsgid   = cfs_curproc_fsgid();
313         rec->cr_cap     = cfs_curproc_cap_pack();
314         rec->cr_fid     = op_data->fid1;
315         memset(&rec->cr_replayfid, 0, sizeof(rec->cr_replayfid));
316         rec->cr_mode    = mode;
317         rec->cr_flags   = mds_pack_open_flags(flags, mode);
318         rec->cr_rdev    = rdev;
319         rec->cr_time    = op_data->mod_time;
320         rec->cr_suppgid = op_data->suppgids[0];
321
322         if (op_data->name) {
323                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
324                                      op_data->namelen + 1);
325                 LOGL0(op_data->name, op_data->namelen, tmp);
326         }
327
328         if (lmm) {
329                 rec->cr_flags |= MDS_OPEN_HAS_EA;
330 #ifndef __KERNEL__
331                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
332                 rec->cr_replayfid = op_data->fid2;
333 #endif
334                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, lmmlen);
335                 memcpy (tmp, lmm, lmmlen);
336         }
337         EXIT;
338 }
339
340 static void mdc_open_pack_20(struct ptlrpc_request *req, int offset,
341                             struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
342                              __u32 flags, const void *lmm, int lmmlen)
343 {
344         struct mdt_rec_create *rec;
345         char *tmp;
346         ENTRY;
347
348         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
349
350         /* XXX do something about time, uid, gid */
351         rec->cr_opcode = REINT_OPEN;
352         rec->cr_fsuid  = cfs_curproc_fsuid();
353         rec->cr_fsgid  = cfs_curproc_fsgid();
354         rec->cr_cap    = cfs_curproc_cap_pack();
355         memcpy(&rec->cr_fid1, &op_data->fid1, sizeof(op_data->fid1));
356         memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
357         rec->cr_mode   = mode;
358         rec->cr_flags  = mds_pack_open_flags(flags, mode);
359         rec->cr_rdev   = rdev;
360         rec->cr_time   = op_data->mod_time;
361         rec->cr_suppgid1 = op_data->suppgids[0];
362         rec->cr_suppgid2 = op_data->suppgids[1];
363
364         if (op_data->name) {
365                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3,
366                                      op_data->namelen + 1);
367                 CDEBUG(D_INFO, "offset=%d, src=%p(%d):%s, dst=%p\n",
368                         offset, op_data->name, op_data->namelen,
369                         op_data->name, tmp);
370                 LASSERT(tmp);
371                 LOGL0(op_data->name, op_data->namelen, tmp);
372         }
373
374         if (lmm) {
375                 rec->cr_flags |= MDS_OPEN_HAS_EA;
376 #ifndef __KERNEL__
377                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
378                 memcpy(&rec->cr_fid2, &op_data->fid2, sizeof(op_data->fid2));
379 #endif
380                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, lmmlen);
381                 memcpy(tmp, lmm, lmmlen);
382         }
383         EXIT;
384 }
385
386 void mdc_open_pack(struct ptlrpc_request *req, int offset,
387                    struct mdc_op_data *op_data, __u32 mode, __u64 rdev,
388                    __u32 flags, const void *lmm, int lmmlen)
389 {
390         if (mdc_req_is_2_0_server(req))
391                 mdc_open_pack_20(req, offset, op_data, mode, rdev,
392                                  flags, lmm, lmmlen);
393         else
394                 mdc_open_pack_18(req, offset, op_data, mode, rdev,
395                                  flags, lmm, lmmlen);
396
397 }
398
399 static inline __u64 attr_pack(unsigned int ia_valid) {
400         __u64 sa_valid = 0;
401
402         if (ia_valid & ATTR_MODE)
403                 sa_valid |= MDS_ATTR_MODE;
404         if (ia_valid & ATTR_UID)
405                 sa_valid |= MDS_ATTR_UID;
406         if (ia_valid & ATTR_GID)
407                 sa_valid |= MDS_ATTR_GID;
408         if (ia_valid & ATTR_SIZE)
409                 sa_valid |= MDS_ATTR_SIZE;
410         if (ia_valid & ATTR_ATIME)
411                 sa_valid |= MDS_ATTR_ATIME;
412         if (ia_valid & ATTR_MTIME)
413                 sa_valid |= MDS_ATTR_MTIME;
414         if (ia_valid & ATTR_CTIME)
415                 sa_valid |= MDS_ATTR_CTIME;
416         if (ia_valid & ATTR_ATIME_SET)
417                 sa_valid |= MDS_ATTR_ATIME_SET;
418         if (ia_valid & ATTR_MTIME_SET)
419                 sa_valid |= MDS_ATTR_MTIME_SET;
420         if (ia_valid & ATTR_FORCE)
421                 sa_valid |= MDS_ATTR_FORCE;
422         if (ia_valid & ATTR_ATTR_FLAG)
423                 sa_valid |= MDS_ATTR_ATTR_FLAG;
424         if (ia_valid & ATTR_KILL_SUID)
425                 sa_valid |=  MDS_ATTR_KILL_SUID;
426         if (ia_valid & ATTR_KILL_SGID)
427                 sa_valid |= MDS_ATTR_KILL_SGID;
428         if (ia_valid & ATTR_CTIME_SET)
429                 sa_valid |= MDS_ATTR_CTIME_SET;
430         if (ia_valid & ATTR_FROM_OPEN)
431                 sa_valid |= MDS_ATTR_FROM_OPEN;
432         if (ia_valid & MDS_OPEN_OWNEROVERRIDE)
433                 /* NFSD hack (see bug 5781) */
434                 sa_valid |= MDS_OPEN_OWNEROVERRIDE;
435         return sa_valid;
436 }
437
438 void mdc_setattr_pack_18(struct ptlrpc_request *req, int offset,
439                          struct mdc_op_data *data, struct iattr *iattr, void *ea,
440                          int ealen, void *ea2, int ea2len)
441 {
442         struct lov_user_md     *lum = NULL;
443         struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
444                                                      sizeof(*rec));
445         ENTRY;
446
447         rec->sa_opcode = REINT_SETATTR;
448         rec->sa_fsuid = cfs_curproc_fsuid();
449         rec->sa_fsgid = cfs_curproc_fsgid();
450         rec->sa_cap = cfs_curproc_cap_pack();
451         rec->sa_fid = data->fid1;
452         rec->sa_suppgid = -1;
453
454         if (iattr) {
455                 rec->sa_valid = attr_pack(iattr->ia_valid);
456                 rec->sa_mode = iattr->ia_mode;
457                 rec->sa_uid = iattr->ia_uid;
458                 rec->sa_gid = iattr->ia_gid;
459                 rec->sa_size = iattr->ia_size;
460                 rec->sa_atime = LTIME_S(iattr->ia_atime);
461                 rec->sa_mtime = LTIME_S(iattr->ia_mtime);
462                 rec->sa_ctime = LTIME_S(iattr->ia_ctime);
463                 rec->sa_attr_flags =
464                                ((struct ll_iattr_struct *)iattr)->ia_attr_flags;
465                 if ((iattr->ia_valid & ATTR_GID) && in_group_p(iattr->ia_gid))
466                         rec->sa_suppgid = iattr->ia_gid;
467                 else
468                         rec->sa_suppgid = data->suppgids[0];
469         }
470
471         if (ealen == 0) {
472                 EXIT;
473                 return;
474         }
475
476         lum = lustre_msg_buf(req->rq_reqmsg, offset + 1, ealen);
477         if (ea == NULL) { /* Remove LOV EA */
478                 lum->lmm_magic = LOV_USER_MAGIC_V1;
479                 lum->lmm_stripe_size = 0;
480                 lum->lmm_stripe_count = 0;
481                 lum->lmm_stripe_offset = (typeof(lum->lmm_stripe_offset))(-1);
482         } else {
483                 memcpy(lum, ea, ealen);
484         }
485
486         if (ea2len == 0) {
487                 EXIT;
488                 return;
489         }
490         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ea2len), ea2, ea2len);
491
492         EXIT;
493 }
494
495 static void mdc_setattr_pack_20(struct ptlrpc_request *req, int offset,
496                                 struct mdc_op_data *data, struct iattr *iattr,
497                                 void *ea, int ealen, void *ea2, int ea2len)
498 {
499         struct mdt_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
500                                                      sizeof(*rec));
501         struct lov_user_md     *lum = NULL;
502         ENTRY;
503
504         rec->sa_opcode  = REINT_SETATTR;
505         rec->sa_fsuid   = cfs_curproc_fsuid();
506         rec->sa_fsgid   = cfs_curproc_fsgid();
507         rec->sa_cap     = cfs_curproc_cap_pack();
508         memcpy(&rec->sa_fid, &data->fid1, sizeof(data->fid1));
509         rec->sa_suppgid = -1;
510
511         if (iattr) {
512                 rec->sa_valid   = attr_pack(iattr->ia_valid);
513                 rec->sa_mode    = iattr->ia_mode;
514                 rec->sa_uid     = iattr->ia_uid;
515                 rec->sa_gid     = iattr->ia_gid;
516                 rec->sa_size    = iattr->ia_size;
517 //              rec->sa_blocks  = iattr->ia_blocks;
518                 rec->sa_atime   = LTIME_S(iattr->ia_atime);
519                 rec->sa_mtime   = LTIME_S(iattr->ia_mtime);
520                 rec->sa_ctime   = LTIME_S(iattr->ia_ctime);
521                 rec->sa_attr_flags = 
522                         ((struct ll_iattr_struct *)iattr)->ia_attr_flags;
523                 if ((iattr->ia_valid & ATTR_GID) && in_group_p(iattr->ia_gid))
524                         rec->sa_suppgid = iattr->ia_gid;
525                 else
526                         rec->sa_suppgid = data->suppgids[0];
527         }
528         if (ealen == 0) {
529                 EXIT;
530                 return;
531         }
532         lum = lustre_msg_buf(req->rq_reqmsg, offset + 3, ealen);
533         if (ea == NULL) { /* Remove LOV EA */
534                 lum->lmm_magic = LOV_USER_MAGIC_V1;
535                 lum->lmm_stripe_size = 0;
536                 lum->lmm_stripe_count = 0;
537                 lum->lmm_stripe_offset = (typeof(lum->lmm_stripe_offset))(-1);
538         } else {
539                 memcpy(lum, ea, ealen);
540         }
541
542         if (ea2len == 0) {
543                 EXIT;
544                 return;
545         }
546         memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 4, ea2len), ea2, ea2len);
547         EXIT;
548 }
549
550 void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
551                       struct mdc_op_data *data, struct iattr *iattr,
552                       void *ea, int ealen, void *ea2, int ea2len)
553 {
554         if (mdc_req_is_2_0_server(req))
555                 mdc_setattr_pack_20(req, offset, data, iattr,
556                                     ea, ealen, ea2, ea2len);
557         else
558                 mdc_setattr_pack_18(req, offset, data, iattr,
559                                     ea, ealen, ea2, ea2len);
560 }
561
562 static void mdc_unlink_pack_18(struct ptlrpc_request *req, int offset,
563                                struct mdc_op_data *data)
564 {
565         struct mds_rec_unlink *rec;
566         char *tmp;
567         ENTRY;
568
569         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
570         LASSERT (rec != NULL);
571
572         rec->ul_opcode = REINT_UNLINK;
573         rec->ul_fsuid = cfs_curproc_fsuid();
574         rec->ul_fsgid = cfs_curproc_fsgid();
575         rec->ul_cap = cfs_curproc_cap_pack();
576         rec->ul_mode = data->create_mode;
577         rec->ul_suppgid = data->suppgids[0];
578         rec->ul_fid1 = data->fid1;
579         rec->ul_fid2 = data->fid2;
580         rec->ul_time = data->mod_time;
581
582         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, data->namelen + 1);
583         LASSERT (tmp != NULL);
584         LOGL0(data->name, data->namelen, tmp);
585         EXIT;
586 }
587
588 static void mdc_unlink_pack_20(struct ptlrpc_request *req, int offset,
589                                struct mdc_op_data *data)
590 {
591         struct mdt_rec_unlink *rec;
592         char *tmp;
593         ENTRY;
594
595         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
596         LASSERT (rec != NULL);
597
598         rec->ul_opcode  = REINT_UNLINK;
599         rec->ul_fsuid   = cfs_curproc_fsuid();
600         rec->ul_fsgid   = cfs_curproc_fsgid();
601         rec->ul_cap     = cfs_curproc_cap_pack();
602         rec->ul_mode    = data->create_mode;
603         rec->ul_suppgid1= data->suppgids[0];
604         memcpy(&rec->ul_fid1, &data->fid1, sizeof(data->fid1));
605         memcpy(&rec->ul_fid2, &data->fid2, sizeof(data->fid2));
606         rec->ul_time    = data->mod_time;
607
608         /* NULL capa is skipped. */
609
610         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, data->namelen + 1);
611         LASSERT (tmp != NULL);
612         LOGL0(data->name, data->namelen, tmp);
613         EXIT;
614 }
615
616 void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
617                      struct mdc_op_data *data)
618 {
619         if (mdc_req_is_2_0_server(req))
620                 mdc_unlink_pack_20(req, offset, data);
621         else
622                 mdc_unlink_pack_18(req, offset, data);
623 }
624 static void mdc_link_pack_18(struct ptlrpc_request *req, int offset,
625                              struct mdc_op_data *data)
626 {
627         struct mds_rec_link *rec;
628         char *tmp;
629         ENTRY;
630
631         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
632
633         rec->lk_opcode = REINT_LINK;
634         rec->lk_fsuid = cfs_curproc_fsuid();
635         rec->lk_fsgid = cfs_curproc_fsgid();
636         rec->lk_cap = cfs_curproc_cap_pack();
637         rec->lk_suppgid1 = data->suppgids[0];
638         rec->lk_suppgid2 = data->suppgids[1];
639         rec->lk_fid1 = data->fid1;
640         rec->lk_fid2 = data->fid2;
641         rec->lk_time = data->mod_time;
642
643         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, data->namelen + 1);
644         LOGL0(data->name, data->namelen, tmp);
645         EXIT;
646 }
647
648 static void mdc_link_pack_20(struct ptlrpc_request *req, int offset,
649                              struct mdc_op_data *data)
650 {
651         struct mdt_rec_link *rec;
652         char *tmp;
653         ENTRY;
654
655         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
656
657         rec->lk_opcode   = REINT_LINK;
658         rec->lk_fsuid    = cfs_curproc_fsuid();
659         rec->lk_fsgid    = cfs_curproc_fsgid();
660         rec->lk_cap      = cfs_curproc_cap_pack();
661         rec->lk_suppgid1 = data->suppgids[0];
662         rec->lk_suppgid2 = data->suppgids[1];
663         memcpy(&rec->lk_fid1, &data->fid1, sizeof(data->fid1));
664         memcpy(&rec->lk_fid2, &data->fid2, sizeof(data->fid2));
665         rec->lk_time     = data->mod_time;
666
667
668         /* capa @ offset + 1; */
669         /* capa @ offset + 2; */
670
671         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, data->namelen + 1);
672         LOGL0(data->name, data->namelen, tmp);
673         EXIT;
674 }
675
676 void mdc_link_pack(struct ptlrpc_request *req, int offset,
677                    struct mdc_op_data *data)
678 {
679         if (mdc_req_is_2_0_server(req))
680                 mdc_link_pack_20(req, offset, data);
681         else
682                 mdc_link_pack_18(req, offset, data);
683 }
684
685 static void mdc_rename_pack_18(struct ptlrpc_request *req, int offset,
686                                struct mdc_op_data *data, const char *old, 
687                                int oldlen, const char *new, int newlen)
688 {
689         struct mds_rec_rename *rec;
690         char *tmp;
691         ENTRY;
692
693         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
694
695         /* XXX do something about time, uid, gid */
696         rec->rn_opcode = REINT_RENAME;
697         rec->rn_fsuid = cfs_curproc_fsuid();
698         rec->rn_fsgid = cfs_curproc_fsgid();
699         rec->rn_cap = cfs_curproc_cap_pack();
700         rec->rn_suppgid1 = data->suppgids[0];
701         rec->rn_suppgid2 = data->suppgids[1];
702         rec->rn_fid1 = data->fid1;
703         rec->rn_fid2 = data->fid2;
704         rec->rn_time = data->mod_time;
705
706         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, oldlen + 1);
707         LOGL0(old, oldlen, tmp);
708
709         if (new) {
710                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, newlen + 1);
711                 LOGL0(new, newlen, tmp);
712         }
713         EXIT;
714 }
715
716 static void mdc_rename_pack_20(struct ptlrpc_request *req, int offset,
717                                struct mdc_op_data *data, const char *old,
718                                int oldlen, const char *new, int newlen)
719 {
720         struct mdt_rec_rename *rec;
721         char *tmp;
722         ENTRY;
723
724         rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
725
726         /* XXX do something about time, uid, gid */
727         rec->rn_opcode   = REINT_RENAME;
728         rec->rn_fsuid    = cfs_curproc_fsuid();
729         rec->rn_fsgid    = cfs_curproc_fsgid();
730         rec->rn_cap      = cfs_curproc_cap_pack();
731         rec->rn_suppgid1 = data->suppgids[0];
732         rec->rn_suppgid2 = data->suppgids[1];
733         memcpy(&rec->rn_fid1, &data->fid1, sizeof(data->fid1));
734         memcpy(&rec->rn_fid2, &data->fid2, sizeof(data->fid2));
735         rec->rn_time     = data->mod_time;
736         rec->rn_mode     = data->create_mode;
737
738
739         /* skip capa @ offset + 1 */
740         /* skip capa @ offset + 2 */
741
742         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, oldlen + 1);
743         LOGL0(old, oldlen, tmp);
744
745         if (new) {
746                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, newlen + 1);
747                 LOGL0(new, newlen, tmp);
748         }
749         EXIT;
750 }
751
752 void mdc_rename_pack(struct ptlrpc_request *req, int offset,
753                      struct mdc_op_data *data, const char *old,
754                      int oldlen, const char *new, int newlen)
755 {
756         if (mdc_req_is_2_0_server(req))
757                 mdc_rename_pack_20(req, offset, data, old, oldlen, new, newlen);
758         else
759                 mdc_rename_pack_18(req, offset, data, old, oldlen, new, newlen);
760 }
761
762 static void mdc_getattr_pack_18(struct ptlrpc_request *req, int offset,
763                                 __u64 valid, int flags,
764                                 struct mdc_op_data *data)
765 {
766         struct mds_body *b;
767         ENTRY;
768
769         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
770
771         b->fsuid = cfs_curproc_fsuid();
772         b->fsgid = cfs_curproc_fsgid();
773         b->capability = cfs_curproc_cap_pack();
774         b->valid = valid;
775         b->flags = flags | MDS_BFLAG_EXT_FLAGS;
776         /* skip MDS_BFLAG_EXT_FLAGS to verify the "client < 1.4.7" case 
777          * refer to bug 12848.
778          */
779         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_OLD_EXT_FLAGS))
780                 b->flags &= ~MDS_BFLAG_EXT_FLAGS;
781         b->suppgid = data->suppgids[0];
782
783         b->fid1 = data->fid1;
784         b->fid2 = data->fid2;
785         if (data->name) {
786                 char *tmp;
787                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
788                                      data->namelen + 1);
789                 memcpy(tmp, data->name, data->namelen);
790                 data->name = tmp;
791         }
792         EXIT;
793 }
794
795 static void mdc_getattr_pack_20(struct ptlrpc_request *req, int offset,
796                                 __u64 valid, int flags,
797                                 struct mdc_op_data *data, int ea_size)
798 {
799         struct mdt_body *b;
800         ENTRY;
801
802         b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*b));
803
804         b->fsuid = cfs_curproc_fsuid();
805         b->fsgid = cfs_curproc_fsgid();
806         b->capability = cfs_curproc_cap_pack();
807         b->valid = valid;
808         b->eadatasize = ea_size;
809         b->flags = flags | MDS_BFLAG_EXT_FLAGS;
810         b->suppgid = data->suppgids[0];
811
812         memcpy(&b->fid1, &data->fid1, sizeof(data->fid1));
813         memcpy(&b->fid2, &data->fid2, sizeof(data->fid2));
814         b->valid |= OBD_MD_FLID;
815         if (data->name) {
816                 char *tmp;
817                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2,
818                                      data->namelen + 1);
819                 LASSERT(tmp);
820                 LOGL0(data->name, data->namelen, tmp);
821         }
822         EXIT;
823 }
824
825 void mdc_getattr_pack(struct ptlrpc_request *req, int offset,
826                       __u64 valid, int flags,
827                       struct mdc_op_data *data, int ea_size)
828 {
829         if (mdc_req_is_2_0_server(req))
830                 mdc_getattr_pack_20(req, offset, valid, flags, data, ea_size);
831         else
832                 mdc_getattr_pack_18(req, offset, valid, flags, data);
833 }
834 static void mdc_close_pack_18(struct ptlrpc_request *req, int offset,
835                               struct mdc_op_data *data,
836                               struct obdo *oa, __u64 valid,
837                               struct obd_client_handle *och)
838 {
839         struct mds_body *body;
840         ENTRY;
841
842         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
843
844         body->fid1 = data->fid1;
845         memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
846         if (oa->o_valid & OBD_MD_FLATIME) {
847                 body->atime = oa->o_atime;
848                 body->valid |= OBD_MD_FLATIME;
849         }
850         if (oa->o_valid & OBD_MD_FLMTIME) {
851                 body->mtime = oa->o_mtime;
852                 body->valid |= OBD_MD_FLMTIME;
853         }
854         if (oa->o_valid & OBD_MD_FLCTIME) {
855                 body->ctime = oa->o_ctime;
856                 body->valid |= OBD_MD_FLCTIME;
857         }
858         if (oa->o_valid & OBD_MD_FLSIZE) {
859                 body->size = oa->o_size;
860                 body->valid |= OBD_MD_FLSIZE;
861         }
862         if (oa->o_valid & OBD_MD_FLBLOCKS) {
863                 body->blocks = oa->o_blocks;
864                 body->valid |= OBD_MD_FLBLOCKS;
865         }
866         if (oa->o_valid & OBD_MD_FLFLAGS) {
867                 body->flags = oa->o_flags;
868                 body->valid |= OBD_MD_FLFLAGS;
869         }
870         EXIT;
871 }
872
873 static void mdc_close_pack_20(struct ptlrpc_request *req, int offset,
874                               struct mdc_op_data *data,
875                               struct obdo *oa, __u64 valid,
876                               struct obd_client_handle *och)
877 {
878         struct mdt_epoch *epoch;
879         struct mdt_rec_setattr *rec;
880         ENTRY;
881
882         epoch = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*epoch));
883         rec = lustre_msg_buf(req->rq_reqmsg, offset + 1, sizeof(*rec));
884
885         rec->sa_opcode  = REINT_SETATTR;
886         rec->sa_fsuid   = cfs_curproc_fsuid();
887         rec->sa_fsgid   = cfs_curproc_fsgid();
888         rec->sa_cap     = cfs_curproc_cap_pack();
889         rec->sa_suppgid = -1;
890
891         memcpy(&rec->sa_fid, &data->fid1, sizeof(data->fid1));
892
893         if (oa->o_valid & OBD_MD_FLATIME) {
894                 rec->sa_atime = oa->o_atime;
895                 rec->sa_valid |= MDS_ATTR_ATIME;
896         }
897         if (oa->o_valid & OBD_MD_FLMTIME) {
898                 rec->sa_mtime = oa->o_mtime;
899                 rec->sa_valid |= MDS_ATTR_MTIME;
900         }
901         if (oa->o_valid & OBD_MD_FLCTIME) {
902                 rec->sa_ctime = oa->o_ctime;
903                 rec->sa_valid |= MDS_ATTR_CTIME;
904         }
905         if (oa->o_valid & OBD_MD_FLSIZE) {
906                 rec->sa_size = oa->o_size;
907                 rec->sa_valid |= MDS_ATTR_SIZE;
908         }
909         if (oa->o_valid & OBD_MD_FLBLOCKS) {
910                 rec->sa_blocks = oa->o_blocks;
911                 rec->sa_valid |= MDS_ATTR_BLOCKS;
912         }
913         if (oa->o_valid & OBD_MD_FLFLAGS) {
914                 rec->sa_attr_flags = oa->o_flags;
915                 rec->sa_valid |= MDS_ATTR_ATTR_FLAG;
916         }
917
918         epoch->handle = och->och_fh;
919         epoch->ioepoch = 0;
920         epoch->flags = 0;
921
922         EXIT;
923 }
924
925
926 void mdc_close_pack(struct ptlrpc_request *req, int offset,
927                     struct mdc_op_data *data,
928                     struct obdo *oa, __u64 valid,
929                     struct obd_client_handle *och)
930 {
931         if (mdc_req_is_2_0_server(req))
932                 mdc_close_pack_20(req, offset, data, oa, valid, och);
933         else
934                 mdc_close_pack_18(req, offset, data, oa, valid, och);
935 }
936 struct mdc_cache_waiter {
937         struct list_head        mcw_entry;
938         wait_queue_head_t       mcw_waitq;
939 };
940
941 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
942 {
943         int rc;
944         ENTRY;
945         spin_lock(&cli->cl_loi_list_lock);
946         rc = list_empty(&mcw->mcw_entry);
947         spin_unlock(&cli->cl_loi_list_lock);
948         RETURN(rc);
949 };
950
951 /* We record requests in flight in cli->cl_r_in_flight here.
952  * There is only one write rpc possible in mdc anyway. If this to change
953  * in the future - the code may need to be revisited. */
954 int mdc_enter_request(struct client_obd *cli)
955 {
956         int rc = 0;
957         struct mdc_cache_waiter mcw;
958         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
959
960         spin_lock(&cli->cl_loi_list_lock);
961         if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
962                 list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
963                 init_waitqueue_head(&mcw.mcw_waitq);
964                 spin_unlock(&cli->cl_loi_list_lock);
965                 rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw),
966                                   &lwi);
967                 if (rc) {
968                         spin_lock(&cli->cl_loi_list_lock);
969                         if (list_empty(&mcw.mcw_entry))
970                                 cli->cl_r_in_flight--;
971                         list_del_init(&mcw.mcw_entry);
972                         spin_unlock(&cli->cl_loi_list_lock);
973                 }
974         } else {
975                 cli->cl_r_in_flight++;
976                 spin_unlock(&cli->cl_loi_list_lock);
977         }
978         return rc;
979 }
980
981 void mdc_exit_request(struct client_obd *cli)
982 {
983         struct list_head *l, *tmp;
984         struct mdc_cache_waiter *mcw;
985
986         spin_lock(&cli->cl_loi_list_lock);
987         cli->cl_r_in_flight--;
988
989         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
990                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
991                         /* No free request slots anymore */
992                         break;
993                 }
994
995                 mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
996                 list_del_init(&mcw->mcw_entry);
997                 cli->cl_r_in_flight++;
998                 wake_up(&mcw->mcw_waitq);
999         }
1000         /* Empty waiting list? Decrease reqs in-flight number */
1001
1002         spin_unlock(&cli->cl_loi_list_lock);
1003 }