Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / cmm / mdc_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/mdc_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34 #include <obd_support.h>
35 #include <lustre_lib.h>
36 #include <obd_class.h>
37 #include <lustre_mdc.h>
38 #include "cmm_internal.h"
39 #include "mdc_internal.h"
40
41 static struct md_object_operations mdc_mo_ops;
42 static struct md_dir_operations mdc_dir_ops;
43 static struct lu_object_operations mdc_obj_ops;
44
45 extern struct lu_context_key mdc_thread_key;
46
47 struct lu_object *mdc_object_alloc(const struct lu_env *env,
48                                    const struct lu_object_header *hdr,
49                                    struct lu_device *ld)
50 {
51         struct mdc_object *mco;
52         ENTRY;
53
54         OBD_ALLOC_PTR(mco);
55         if (mco != NULL) {
56                 struct lu_object *lo;
57
58                 lo = &mco->mco_obj.mo_lu;
59                 lu_object_init(lo, NULL, ld);
60                 mco->mco_obj.mo_ops = &mdc_mo_ops;
61                 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
62                 lo->lo_ops = &mdc_obj_ops;
63                 RETURN(lo);
64         } else
65                 RETURN(NULL);
66 }
67
68 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
69 {
70         struct mdc_object *mco = lu2mdc_obj(lo);
71         lu_object_fini(lo);
72         OBD_FREE_PTR(mco);
73 }
74
75 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo)
76 {
77         ENTRY;
78         lo->lo_header->loh_attr |= LOHA_REMOTE;
79         RETURN(0);
80 }
81
82 static int mdc_object_print(const struct lu_env *env, void *cookie,
83                             lu_printer_t p, const struct lu_object *lo)
84 {
85         return (*p)(env, cookie, LUSTRE_CMM_MDC_NAME"-object@%p", lo);
86 }
87
88 static struct lu_object_operations mdc_obj_ops = {
89         .loo_object_init    = mdc_object_init,
90         .loo_object_free    = mdc_object_free,
91         .loo_object_print   = mdc_object_print,
92 };
93
94 /* md_object_operations */
95 static
96 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
97 {
98         struct mdc_thread_info *mci;
99
100         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
101         LASSERT(mci);
102         return mci;
103 }
104
105 static
106 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
107 {
108         struct mdc_thread_info *mci;
109
110         mci = mdc_info_get(env);
111
112         memset(mci, 0, sizeof(*mci));
113
114         return mci;
115 }
116
117 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
118 {
119         struct lu_attr *la = &ma->ma_attr;
120         /* update time */
121         if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
122                 la->la_ctime = body->ctime;
123                 if (body->valid & OBD_MD_FLMTIME)
124                         la->la_mtime = body->mtime;
125         }
126
127         if (body->valid & OBD_MD_FLMODE)
128                 la->la_mode = body->mode;
129         if (body->valid & OBD_MD_FLSIZE)
130                 la->la_size = body->size;
131         if (body->valid & OBD_MD_FLBLOCKS)
132                 la->la_blocks = body->blocks;
133         if (body->valid & OBD_MD_FLUID)
134                 la->la_uid = body->uid;
135         if (body->valid & OBD_MD_FLGID)
136                 la->la_gid = body->gid;
137         if (body->valid & OBD_MD_FLFLAGS)
138                 la->la_flags = body->flags;
139         if (body->valid & OBD_MD_FLNLINK)
140                 la->la_nlink = body->nlink;
141         if (body->valid & OBD_MD_FLRDEV)
142                 la->la_rdev = body->rdev;
143
144         ma->ma_valid = MA_INODE;
145 }
146
147 static int mdc_req2attr_update(const struct lu_env *env,
148                                 struct md_attr *ma)
149 {
150         struct mdc_thread_info *mci;
151         struct ptlrpc_request *req;
152         struct mdt_body *body;
153         struct lov_mds_md *lov;
154         struct llog_cookie *cookie;
155
156         ENTRY;
157         mci = mdc_info_get(env);
158         req = mci->mci_req;
159         LASSERT(req);
160         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
161         LASSERT(body);
162         mdc_body2attr(body, ma);
163
164         if (!(body->valid & OBD_MD_FLEASIZE))
165                 RETURN(0);
166
167         if (body->eadatasize == 0) {
168                 CERROR("OBD_MD_FLEASIZE is set but eadatasize is zero\n");
169                 RETURN(-EPROTO);
170         }
171
172         lov = lustre_swab_repbuf(req, REPLY_REC_OFF + 1,
173                                  body->eadatasize, NULL);
174         if (lov == NULL) {
175                 CERROR("Can't unpack MDS EA data\n");
176                 RETURN(-EPROTO);
177         }
178
179         LASSERT(ma->ma_lmm != NULL);
180         LASSERT(ma->ma_lmm_size >= body->eadatasize); 
181         ma->ma_lmm_size = body->eadatasize;
182         memcpy(ma->ma_lmm, lov, ma->ma_lmm_size);
183         ma->ma_valid |= MA_LOV;
184         if (!(body->valid & OBD_MD_FLCOOKIE))
185                 RETURN(0);
186
187         if (body->aclsize == 0) {
188                 CERROR("OBD_MD_FLCOOKIE is set but cookie size is zero\n");
189                 RETURN(-EPROTO);
190         }
191
192         cookie = lustre_msg_buf(req->rq_repmsg,
193                                 REPLY_REC_OFF + 2, body->aclsize);
194         if (cookie == NULL) {
195                 CERROR("Can't unpack unlink cookie data\n");
196                 RETURN(-EPROTO);
197         }
198
199         LASSERT(ma->ma_cookie != NULL);
200         LASSERT(ma->ma_cookie_size == body->aclsize);
201         memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
202         ma->ma_valid |= MA_COOKIE;
203         RETURN(0);
204 }
205
206 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
207                         struct md_attr *ma)
208 {
209         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
210         struct mdc_thread_info *mci;
211         int rc;
212         ENTRY;
213
214         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
215         LASSERT(mci);
216
217         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
218
219         rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), NULL,
220                         OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
221                         OBD_MD_FLFLAGS,
222                         0, &mci->mci_req);
223         if (rc == 0) {
224                 /* get attr from request */
225                 rc = mdc_req2attr_update(env, ma);
226         }
227
228         ptlrpc_req_finished(mci->mci_req);
229
230         RETURN(rc);
231 }
232
233
234 static int mdc_object_create(const struct lu_env *env,
235                              struct md_object *mo,
236                              const struct md_create_spec *spec,
237                              struct md_attr *ma)
238 {
239         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
240         struct lu_attr *la = &ma->ma_attr;
241         struct mdc_thread_info *mci;
242         const void *symname;
243         struct md_ucred *uc = md_ucred(env);
244         int rc, symlen;
245         uid_t uid;
246         gid_t gid;
247         __u32 cap;
248         ENTRY;
249
250         LASSERT(spec->u.sp_pfid != NULL);
251         mci = mdc_info_init(env);
252         mci->mci_opdata.fid2 = *lu_object_fid(&mo->mo_lu);
253         /* parent fid is needed to create dotdot on the remote node */
254         mci->mci_opdata.fid1 = *(spec->u.sp_pfid);
255         mci->mci_opdata.mod_time = la->la_mtime;
256         if (uc &&
257             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
258                 uid = uc->mu_fsuid;
259                 if (la->la_mode & S_ISGID)
260                         gid = la->la_gid;
261                 else
262                         gid = uc->mu_fsgid;
263                 cap = uc->mu_cap;
264                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
265                         mci->mci_opdata.suppgids[0] = uc->mu_suppgids[0];
266                 else
267                         mci->mci_opdata.suppgids[0] = -1;
268         } else {
269                 uid = la->la_uid;
270                 gid = la->la_gid;
271                 cap = 0;
272                 mci->mci_opdata.suppgids[0] = -1;
273         }
274
275         /* get data from spec */
276         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
277                 symname = spec->u.sp_ea.eadata;
278                 symlen = spec->u.sp_ea.eadatalen;
279                 mci->mci_opdata.fid1 = *(spec->u.sp_ea.fid);
280                 mci->mci_opdata.flags |= MDS_CREATE_SLAVE_OBJ;
281 #ifdef CONFIG_FS_POSIX_ACL
282         } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
283                 symname = spec->u.sp_ea.eadata;
284                 symlen = spec->u.sp_ea.eadatalen;
285                 mci->mci_opdata.flags |= MDS_CREATE_RMT_ACL;
286 #endif
287         } else {
288                 symname = spec->u.sp_symname;
289                 symlen = symname ? strlen(symname) + 1 : 0;
290         }
291
292         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
293                        symname, symlen,
294                        la->la_mode, uid, gid, cap, la->la_rdev,
295                        &mci->mci_req);
296
297         if (rc == 0) {
298                 /* get attr from request */
299                 rc = mdc_req2attr_update(env, ma);
300         }
301
302         ptlrpc_req_finished(mci->mci_req);
303
304         RETURN(rc);
305 }
306
307 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo)
308 {
309         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
310         struct mdc_thread_info *mci;
311         struct md_ucred *uc = md_ucred(env);
312         int rc;
313         ENTRY;
314
315         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
316         LASSERT(mci);
317
318         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
319         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
320         //mci->mci_opdata.mod_time = la->la_ctime;
321         //mci->mci_opdata.fsuid = la->la_uid;
322         //mci->mci_opdata.fsgid = la->la_gid;
323         mci->mci_opdata.mod_time = CURRENT_SECONDS;
324         if (uc &&
325             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
326                 mci->mci_opdata.fsuid = uc->mu_fsuid;
327                 mci->mci_opdata.fsgid = uc->mu_fsgid;
328                 mci->mci_opdata.cap = uc->mu_cap;
329                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
330                         mci->mci_opdata.suppgids[0] = uc->mu_suppgids[0];
331                         mci->mci_opdata.suppgids[1] = uc->mu_suppgids[1];
332                 } else {
333                         mci->mci_opdata.suppgids[0] =
334                                 mci->mci_opdata.suppgids[1] = -1;
335                 }
336         } else {
337                 mci->mci_opdata.fsuid = current->fsuid;
338                 mci->mci_opdata.fsgid = current->fsgid;
339                 mci->mci_opdata.cap = current->cap_effective;
340                 mci->mci_opdata.suppgids[0] = mci->mci_opdata.suppgids[1] = -1;
341         }
342
343
344         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
345
346         ptlrpc_req_finished(mci->mci_req);
347
348         RETURN(rc);
349 }
350
351 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
352                        struct md_attr *ma)
353 {
354         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
355         struct lu_attr *la = &ma->ma_attr;
356         struct mdc_thread_info *mci;
357         struct md_ucred *uc = md_ucred(env);
358         int rc;
359         ENTRY;
360
361         mci = mdc_info_init(env);
362         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
363         mci->mci_opdata.mode = la->la_mode;
364         mci->mci_opdata.mod_time = la->la_ctime;
365         if (uc &&
366             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
367                 mci->mci_opdata.fsuid = uc->mu_fsuid;
368                 mci->mci_opdata.fsgid = uc->mu_fsgid;
369                 mci->mci_opdata.cap = uc->mu_cap;
370                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
371                         mci->mci_opdata.suppgids[0] = uc->mu_suppgids[0];
372                 else
373                         mci->mci_opdata.suppgids[0] = -1;
374         } else {
375                 mci->mci_opdata.fsuid = la->la_uid;
376                 mci->mci_opdata.fsgid = la->la_gid;
377                 mci->mci_opdata.cap = current->cap_effective;
378                 mci->mci_opdata.suppgids[0] = -1;
379         }
380
381         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
382         if (rc == 0) {
383                 /* get attr from request */
384                 rc = mdc_req2attr_update(env, ma);
385         }
386
387         ptlrpc_req_finished(mci->mci_req);
388
389         RETURN(rc);
390 }
391
392 #ifdef HAVE_SPLIT_SUPPORT
393 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
394                   struct md_object *mo, struct page *page, __u32 offset)
395 {
396         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
397         int rc;
398         ENTRY;
399
400         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
401                           page, offset);
402         CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
403                page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
404         RETURN(rc);
405 }
406 #endif
407
408 static struct md_object_operations mdc_mo_ops = {
409         .moo_attr_get       = mdc_attr_get,
410         .moo_object_create  = mdc_object_create,
411         .moo_ref_add        = mdc_ref_add,
412         .moo_ref_del        = mdc_ref_del,
413 };
414
415 /* md_dir_operations */
416 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
417                           struct md_object *mo_t, const struct lu_fid *lf,
418                           const char *name, struct md_attr *ma)
419 {
420         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
421         struct lu_attr *la = &ma->ma_attr;
422         struct mdc_thread_info *mci;
423         struct md_ucred *uc = md_ucred(env);
424         int rc;
425         ENTRY;
426
427         mci = mdc_info_init(env);
428         mci->mci_opdata.fid1 = *lu_object_fid(&mo_p->mo_lu);
429         mci->mci_opdata.fid2 = *lf;
430         mci->mci_opdata.mode = la->la_mode;
431         mci->mci_opdata.mod_time = la->la_ctime;
432         if (uc &&
433             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
434                 mci->mci_opdata.fsuid = uc->mu_fsuid;
435                 mci->mci_opdata.fsgid = uc->mu_fsgid;
436                 mci->mci_opdata.cap = uc->mu_cap;
437                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
438                         mci->mci_opdata.suppgids[0] = uc->mu_suppgids[0];
439                         mci->mci_opdata.suppgids[1] = uc->mu_suppgids[1];
440                 } else {
441                         mci->mci_opdata.suppgids[0] =
442                                 mci->mci_opdata.suppgids[1] = -1;
443                 }
444         } else {
445                 mci->mci_opdata.fsuid = la->la_uid;
446                 mci->mci_opdata.fsgid = la->la_gid;
447                 mci->mci_opdata.cap = current->cap_effective;
448                 mci->mci_opdata.suppgids[0] = mci->mci_opdata.suppgids[1] = -1;
449         }
450
451         rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
452                        name, strlen(name), &mci->mci_req);
453         if (rc == 0) {
454                 /* get attr from request */
455                 mdc_req2attr_update(env, ma);
456         }
457
458         ptlrpc_req_finished(mci->mci_req);
459
460         RETURN(rc);
461 }
462
463 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
464                          const struct lu_fid *fid, struct lu_fid *sfid)
465 {
466         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
467         struct mdc_thread_info *mci;
468         struct mdt_body *body;
469         int rc;
470         ENTRY;
471
472         mci = mdc_info_init(env);
473
474         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
475                           fid, &mci->mci_req);
476         if (rc)
477                 GOTO(out, rc);
478
479         body = lustre_msg_buf(mci->mci_req->rq_repmsg, REPLY_REC_OFF,
480                               sizeof(*body));
481
482         LASSERT(body->valid & (OBD_MD_FLMODE | OBD_MD_FLID) &&
483                 (body->mode == 0 || body->mode == 1 || body->mode == EREMOTE));
484
485         rc = body->mode;
486         if (rc == EREMOTE) {
487                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "
488                        DFID"\n", PFID(&body->fid1));
489                 *sfid = body->fid1;
490                 rc = -EREMOTE;
491         }
492         EXIT;
493 out:
494         ptlrpc_req_finished(mci->mci_req);
495         return rc;
496 }
497
498 static struct md_dir_operations mdc_dir_ops = {
499         .mdo_is_subdir   = mdc_is_subdir,
500         .mdo_rename_tgt  = mdc_rename_tgt
501 };