Whamcloud - gitweb
first fixes after code inspection.
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
38
39 /* XXX: fix later this hack. It exists because OSD produces fids with like this:
40    seq = ROOT_SEQ + 1, etc. */
41 static int cmm_special_fid(const struct lu_fid *fid)
42 {
43         struct lu_range *space = (struct lu_range *)&LUSTRE_SEQ_SPACE_RANGE;
44         return !range_within(space, fid_seq(fid));
45 }
46
47 static int cmm_fld_lookup(struct cmm_device *cm,
48                           const struct lu_fid *fid)
49 {
50         __u64 mds;
51         int rc;
52         ENTRY;
53
54         LASSERT(fid_is_sane(fid));
55
56         /* XXX: is this correct? We need this to prevent FLD lookups while CMM
57          * did not initialized yet all MDCs. */
58         if (cmm_special_fid(fid))
59                 mds = 0;
60         else {
61                 rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
62                 if (rc) {
63                         CERROR("can't find mds by seq "LPU64", rc %d\n",
64                                fid_seq(fid), rc);
65                         RETURN(rc);
66                 }
67         }
68         CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
69               mds, fid_seq(fid));
70
71         RETURN((int)mds);
72 }
73
74 static struct md_object_operations cml_mo_ops;
75 static struct md_dir_operations    cml_dir_ops;
76 static struct lu_object_operations cml_obj_ops;
77
78 static struct md_object_operations cmr_mo_ops;
79 static struct md_dir_operations    cmr_dir_ops;
80 static struct lu_object_operations cmr_obj_ops;
81
82 struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
83                                    const struct lu_object_header *loh,
84                                    struct lu_device *ld)
85 {
86         struct lu_object  *lo = NULL;
87         const struct lu_fid *fid = &loh->loh_fid;
88         struct cmm_device *cd;
89         int mdsnum;
90         ENTRY;
91
92         cd = lu2cmm_dev(ld);
93         if (cd->cmm_flags & CMM_INITIALIZED) {
94                 /* get object location */
95                 mdsnum = cmm_fld_lookup(lu2cmm_dev(ld), fid);
96                 if (mdsnum < 0)
97                         RETURN(ERR_PTR(mdsnum));
98         } else
99                 /*
100                  * Device is not yet initialized, cmm_object is being created
101                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
102                  * etc.). Such object *has* to be local.
103                  */
104                 mdsnum = cd->cmm_local_num;
105
106         /* select the proper set of operations based on object location */
107         if (mdsnum == cd->cmm_local_num) {
108                 struct cml_object *clo;
109
110                 OBD_ALLOC_PTR(clo);
111                 if (clo != NULL) {
112                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
113                         lu_object_init(lo, NULL, ld);
114                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
115                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
116                         lo->lo_ops = &cml_obj_ops;
117                         clo->cmm_obj.cmo_local = 1;
118                 }
119         } else {
120                 struct cmr_object *cro;
121
122                 OBD_ALLOC_PTR(cro);
123                 if (cro != NULL) {
124                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
125                         lu_object_init(lo, NULL, ld);
126                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
127                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
128                         lo->lo_ops = &cmr_obj_ops;
129                         cro->cmo_num = mdsnum;
130                         cro->cmm_obj.cmo_local = 0;
131                 }
132         }
133         RETURN(lo);
134 }
135
136 /*
137  * CMM has two types of objects - local and remote. They have different set
138  * of operations so we are avoiding multiple checks in code.
139  */
140
141 /*
142  * local CMM object operations. cml_...
143  */
144 static inline struct cml_object *lu2cml_obj(struct lu_object *o)
145 {
146         return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
147 }
148 static inline struct cml_object *md2cml_obj(struct md_object *mo)
149 {
150         return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
151 }
152 static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
153 {
154         return container_of0(co, struct cml_object, cmm_obj);
155 }
156 /* get local child device */
157 static struct lu_device *cml_child_dev(struct cmm_device *d)
158 {
159         return &d->cmm_child->md_lu_dev;
160 }
161
162 /* lu_object operations */
163 static void cml_object_free(const struct lu_context *ctx,
164                             struct lu_object *lo)
165 {
166         struct cml_object *clo = lu2cml_obj(lo);
167         lu_object_fini(lo);
168         OBD_FREE_PTR(clo);
169 }
170
171 static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
172 {
173         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
174         struct lu_device  *c_dev;
175         struct lu_object  *c_obj;
176         int rc;
177
178         ENTRY;
179
180         c_dev = cml_child_dev(cd);
181         if (c_dev == NULL) {
182                 rc = -ENOENT;
183         } else {
184                 c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
185                                                         lo->lo_header, c_dev);
186                 if (c_obj != NULL) {
187                         lu_object_add(lo, c_obj);
188                         rc = 0;
189                 } else {
190                         rc = -ENOMEM;
191                 }
192         }
193
194         RETURN(rc);
195 }
196
197 static int cml_object_exists(const struct lu_context *ctx,
198                              struct lu_object *lo)
199 {
200         return lu_object_exists(ctx, lu_object_next(lo));
201 }
202
203 static int cml_object_print(const struct lu_context *ctx,
204                             struct seq_file *f, const struct lu_object *lo)
205 {
206         return seq_printf(f, LUSTRE_CMM0_NAME"-local@%p", lo);
207 }
208
209 static struct lu_object_operations cml_obj_ops = {
210         .loo_object_init    = cml_object_init,
211         .loo_object_free    = cml_object_free,
212         .loo_object_print   = cml_object_print,
213         .loo_object_exists  = cml_object_exists
214 };
215
216 /* CMM local md_object operations */
217 static int cml_object_create(const struct lu_context *ctx,
218                              struct md_object *mo,
219                              struct lu_attr *attr)
220 {
221         int rc;
222         ENTRY;
223         rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
224         RETURN(rc);
225 }
226
227 static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
228                         struct lu_attr *attr)
229 {
230         int rc;
231         ENTRY;
232         rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
233         RETURN(rc);
234 }
235
236 static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
237                         struct lu_attr *attr)
238 {
239         int rc;
240         ENTRY;
241         rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
242         RETURN(rc);
243 }
244
245 static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
246                          void *buf, int buflen, const char *name)
247 {
248         int rc;
249         ENTRY;
250         rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
251                          buf, buflen, name);
252         RETURN(rc);
253 }
254
255 static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
256                          void *buf, int buflen, const char *name)
257 {
258         int rc;
259         ENTRY;
260         rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
261                           buf, buflen, name);
262         RETURN(rc);
263 }
264
265 static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
266 {
267         int rc;
268         ENTRY;
269         rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
270         RETURN(rc);
271 }
272
273 static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
274 {
275         int rc;
276         ENTRY;
277         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
278         RETURN(rc);
279 }
280
281 static int cml_open(const struct lu_context *ctx, struct md_object *mo)
282 {
283         int rc;
284         ENTRY;
285         rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
286         RETURN(rc);
287 }
288
289 static int cml_close(const struct lu_context *ctx, struct md_object *mo)
290 {
291         int rc;
292         ENTRY;
293         rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
294         RETURN(rc);
295 }
296
297 static struct md_object_operations cml_mo_ops = {
298         .moo_attr_get      = cml_attr_get,
299         .moo_attr_set      = cml_attr_set,
300         .moo_xattr_get     = cml_xattr_get,
301         .moo_xattr_set     = cml_xattr_set,
302         .moo_object_create = cml_object_create,
303         .moo_ref_add       = cml_ref_add,
304         .moo_ref_del       = cml_ref_del,
305         .moo_open          = cml_open,
306         .moo_close         = cml_close
307 };
308
309 /* md_dir operations */
310 static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
311                       const char *name, struct lu_fid *lf)
312 {
313         int rc;
314         ENTRY;
315         rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
316         RETURN(rc);
317
318 }
319
320 static int cml_create(const struct lu_context *ctx,
321                       struct md_object *mo_p, const char *name,
322                       struct md_object *mo_c, struct lu_attr *attr)
323 {
324         int rc;
325         ENTRY;
326         rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
327                         cmm2child_obj(md2cmm_obj(mo_c)), attr);
328         RETURN(rc);
329 }
330
331 static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
332                     struct md_object *mo_s, const char *name)
333 {
334         int rc;
335         ENTRY;
336         rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
337                       cmm2child_obj(md2cmm_obj(mo_s)), name);
338         RETURN(rc);
339 }
340
341 static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
342                       struct md_object *mo_c, const char *name)
343 {
344         int rc;
345         ENTRY;
346         rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
347                         cmm2child_obj(md2cmm_obj(mo_c)), name);
348         RETURN(rc);
349 }
350
351 static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
352                        struct md_object *mo_pn, const struct lu_fid *lf,
353                        const char *s_name, struct md_object *mo_t,
354                        const char *t_name)
355 {
356         int rc;
357         ENTRY;
358
359         if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
360                 /* remote object */
361                 rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
362                 if (rc)
363                         RETURN(rc);
364                 mo_t = NULL;
365         }
366
367         rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
368                         cmm2child_obj(md2cmm_obj(mo_pn)), lf, s_name,
369                         cmm2child_obj(md2cmm_obj(mo_t)), t_name);
370
371         RETURN(rc);
372 }
373
374 static int cml_rename_tgt(const struct lu_context *ctx,
375                           struct md_object *mo_p, struct md_object *mo_t,
376                           const struct lu_fid *lf, const char *name)
377 {
378         int rc;
379         ENTRY;
380
381         rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
382                             cmm2child_obj(md2cmm_obj(mo_t)), lf, name);
383         RETURN(rc);
384 }
385
386 static struct md_dir_operations cml_dir_ops = {
387         .mdo_lookup      = cml_lookup,
388         .mdo_create      = cml_create,
389         .mdo_link        = cml_link,
390         .mdo_unlink      = cml_unlink,
391         .mdo_rename      = cml_rename,
392         .mdo_rename_tgt  = cml_rename_tgt,
393 };
394
395 /* -------------------------------------------------------------------
396  * remote CMM object operations. cmr_...
397  */
398 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
399 {
400         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
401 }
402 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
403 {
404         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
405 }
406 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
407 {
408         return container_of0(co, struct cmr_object, cmm_obj);
409 }
410
411 /* get proper child device from MDCs */
412 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
413 {
414         struct lu_device *next = NULL;
415         struct mdc_device *mdc;
416
417         spin_lock(&d->cmm_tgt_guard);
418         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
419                 if (mdc->mc_num == num) {
420                         next = mdc2lu_dev(mdc);
421                         break;
422                 }
423         }
424         spin_unlock(&d->cmm_tgt_guard);
425         return next;
426 }
427
428 /* lu_object operations */
429 static void cmr_object_free(const struct lu_context *ctx,
430                             struct lu_object *lo)
431 {
432         struct cmr_object *cro = lu2cmr_obj(lo);
433         lu_object_fini(lo);
434         OBD_FREE_PTR(cro);
435 }
436
437 static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
438 {
439         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
440         struct lu_device  *c_dev;
441         struct lu_object  *c_obj;
442         int rc;
443
444         ENTRY;
445
446         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
447         if (c_dev == NULL) {
448                 rc = -ENOENT;
449         } else {
450                 c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
451                                                         lo->lo_header, c_dev);
452                 if (c_obj != NULL) {
453                         lu_object_add(lo, c_obj);
454                         rc = 0;
455                 } else {
456                         rc = -ENOMEM;
457                 }
458         }
459
460         RETURN(rc);
461 }
462
463 static int cmr_object_exists(const struct lu_context *ctx,
464                              struct lu_object *lo)
465 {
466         return lu_object_exists(ctx, lu_object_next(lo));
467 }
468
469 static int cmr_object_print(const struct lu_context *ctx,
470                             struct seq_file *f, const struct lu_object *lo)
471 {
472         return seq_printf(f, LUSTRE_CMM0_NAME"-remote@%p", lo);
473 }
474
475 static struct lu_object_operations cmr_obj_ops = {
476         .loo_object_init    = cmr_object_init,
477         .loo_object_free    = cmr_object_free,
478         .loo_object_print   = cmr_object_print,
479         .loo_object_exists  = cmr_object_exists
480 };
481
482 /* CMM remote md_object operations. All are invalid */
483 static int cmr_object_create(const struct lu_context *ctx,
484                              struct md_object *mo,
485                              struct lu_attr *attr)
486 {
487         RETURN(-EFAULT);
488 }
489
490 static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
491                         struct lu_attr *attr)
492 {
493         RETURN(-EREMOTE);
494 }
495
496 static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
497                         struct lu_attr *attr)
498 {
499         RETURN(-EFAULT);
500 }
501
502 static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
503                          void *buf, int buflen, const char *name)
504 {
505         RETURN(-EFAULT);
506 }
507
508 static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
509                          void *buf, int buflen, const char *name)
510 {
511         RETURN(-EFAULT);
512 }
513
514 static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
515 {
516         RETURN(-EFAULT);
517 }
518
519 static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
520 {
521         RETURN(-EFAULT);
522 }
523
524 static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
525 {
526         RETURN(-EREMOTE);
527 }
528
529 static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
530 {
531         RETURN(-EFAULT);
532 }
533
534 static struct md_object_operations cmr_mo_ops = {
535         .moo_attr_get      = cmr_attr_get,
536         .moo_attr_set      = cmr_attr_set,
537         .moo_xattr_get     = cmr_xattr_get,
538         .moo_xattr_set     = cmr_xattr_set,
539         .moo_object_create = cmr_object_create,
540         .moo_ref_add       = cmr_ref_add,
541         .moo_ref_del       = cmr_ref_del,
542         .moo_open          = cmr_open,
543         .moo_close         = cmr_close
544 };
545
546 /* remote part of md_dir operations */
547 static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
548                       const char *name, struct lu_fid *lf)
549 {
550         /*this can happens while rename()
551          * If new parent is remote dir, lookup will happens here */
552
553         RETURN(-EREMOTE);
554 }
555
556 static int cmr_create(const struct lu_context *ctx,
557                       struct md_object *mo_p, const char *name,
558                       struct md_object *mo_c, struct lu_attr *attr)
559 {
560         int rc;
561
562         ENTRY;
563
564         //XXX: make sure that MDT checks name isn't exist
565
566         /* remote object creation and local name insert */
567         rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
568         if (rc == 0) {
569                 rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
570                                      name, lu_object_fid(&mo_c->mo_lu));
571         }
572
573         RETURN(rc);
574 }
575
576 static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
577                     struct md_object *mo_s, const char *name)
578 {
579         int rc;
580         ENTRY;
581
582         //XXX: make sure that MDT checks name isn't exist
583
584         rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
585         if (rc == 0) {
586                 rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
587                                      name, lu_object_fid(&mo_s->mo_lu));
588         }
589
590         RETURN(rc);
591 }
592
593 static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
594                       struct md_object *mo_c, const char *name)
595 {
596         int rc;
597         ENTRY;
598
599         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
600         if (rc == 0) {
601                 rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
602                                      name);
603         }
604
605         RETURN(rc);
606 }
607
608 static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
609                        struct md_object *mo_pn, const struct lu_fid *lf,
610                        const char *s_name, struct md_object *mo_t,
611                        const char *t_name)
612 {
613         int rc;
614         ENTRY;
615
616         /* the mo_pn is remote directory, so we cannot even know if there is
617          * mo_t or not. Therefore mo_t is NULL here but remote server should do
618          * lookup and process this further */
619
620         LASSERT(mo_t == NULL);
621         rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_pn)),
622                             NULL/* mo_t */, lf, t_name);
623         /* only old name is removed localy */
624         if (rc == 0)
625                 rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
626                                      s_name);
627
628         RETURN(rc);
629 }
630
631 static int cmr_rename_tgt(const struct lu_context *ctx,
632                           struct md_object *mo_p, struct md_object *mo_t,
633                           const struct lu_fid *lf, const char *name)
634 {
635         int rc;
636         ENTRY;
637         /* target object is remote one */
638         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
639         /* continue locally with name handling only */
640         if (rc == 0)
641                 rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
642                                     NULL, lf, name);
643         RETURN(rc);
644 }
645
646 static struct md_dir_operations cmr_dir_ops = {
647         .mdo_lookup      = cmr_lookup,
648         .mdo_create      = cmr_create,
649         .mdo_link        = cmr_link,
650         .mdo_unlink      = cmr_unlink,
651         .mdo_rename      = cmr_rename,
652         .mdo_rename_tgt  = cmr_rename_tgt,
653 };
654
655