Whamcloud - gitweb
cmm: add ->cmm_flags to cmm_device to handle object creation during early bootstrappi...
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
38
39 /* XXX: fix layter this hack. It exists because OSD produces fids with like
40    this: seq = ROOT_SEQ + 1, etc. */
41 static int cmm_special_fid(const struct lu_fid *fid)
42 {
43         if (fid_seq(fid) < LUSTRE_SEQ_SPACE_START)
44                 return 1;
45         return 0;
46 }
47
48 static int cmm_fld_lookup(struct cmm_device *cm,
49                           const struct lu_fid *fid)
50 {
51         __u64 mds;
52         int rc;
53         ENTRY;
54         return 0;
55         LASSERT(fid_is_sane(fid));
56
57         /* XXX: is this correct? We need this to prevent FLD lookups while CMM
58          * did not initialized yet all MDCs. */
59         if (cmm_special_fid(fid))
60                 mds = 0;
61         else {
62                 rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), &mds);
63                 if (rc) {
64                         CERROR("can't find mds by seq "LPU64", rc %d\n",
65                                fid_seq(fid), rc);
66                         RETURN(rc);
67                 }
68         }
69         CWARN("CMM: got MDS "LPU64" for sequence: "LPU64"\n",
70               mds, fid_seq(fid));
71
72         RETURN((int)mds);
73 }
74
75 static struct md_object_operations cml_mo_ops;
76 static struct md_dir_operations    cml_dir_ops;
77 static struct lu_object_operations cml_obj_ops;
78
79 static struct md_object_operations cmr_mo_ops;
80 static struct md_dir_operations    cmr_dir_ops;
81 static struct lu_object_operations cmr_obj_ops;
82
83 struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
84                                    const struct lu_object_header *loh,
85                                    struct lu_device *ld)
86 {
87         struct lu_object  *lo = NULL;
88         const struct lu_fid *fid = &loh->loh_fid;
89         struct cmm_device *cd;
90         int mdsnum;
91         ENTRY;
92
93         cd = lu2cmm_dev(ld);
94         if (cd->cmm_flags & CMM_INITIALIZED) {
95                 /* get object location */
96                 mdsnum = cmm_fld_lookup(lu2cmm_dev(ld), fid);
97                 if (mdsnum < 0)
98                         RETURN(ERR_PTR(mdsnum));
99         } else
100                 /*
101                  * Device is not yet initialized, cmm_object is being created
102                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
103                  * etc.). Such object *has* to be local.
104                  */
105                 mdsnum = cd->cmm_local_num;
106
107         /* select the proper set of operations based on object location */
108         if (mdsnum == cd->cmm_local_num) {
109                 struct cml_object *clo;
110
111                 OBD_ALLOC_PTR(clo);
112                 if (clo != NULL) {
113                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
114                         lu_object_init(lo, NULL, ld);
115                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
116                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
117                         lo->lo_ops = &cml_obj_ops;
118                         clo->cmm_obj.cmo_local = 1;
119                 }
120         } else {
121                 struct cmr_object *cro;
122
123                 OBD_ALLOC_PTR(cro);
124                 if (cro != NULL) {
125                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
126                         lu_object_init(lo, NULL, ld);
127                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
128                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
129                         lo->lo_ops = &cmr_obj_ops;
130                         cro->cmo_num = mdsnum;
131                         cro->cmm_obj.cmo_local = 0;
132                 }
133         }
134         RETURN(lo);
135 }
136
137 /*
138  * CMM has two types of objects - local and remote. They have different set
139  * of operations so we are avoiding multiple checks in code.
140  */
141
142 /*
143  * local CMM object operations. cml_...
144  */
145 static inline struct cml_object *lu2cml_obj(struct lu_object *o)
146 {
147         return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
148 }
149 static inline struct cml_object *md2cml_obj(struct md_object *mo)
150 {
151         return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
152 }
153 static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
154 {
155         return container_of0(co, struct cml_object, cmm_obj);
156 }
157 /* get local child device */
158 static struct lu_device *cml_child_dev(struct cmm_device *d)
159 {
160         return &d->cmm_child->md_lu_dev;
161 }
162
163 /* lu_object operations */
164 static void cml_object_free(const struct lu_context *ctx,
165                             struct lu_object *lo)
166 {
167         struct cml_object *clo = lu2cml_obj(lo);
168         lu_object_fini(lo);
169         OBD_FREE_PTR(clo);
170 }
171
172 static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
173 {
174         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
175         struct lu_device  *c_dev;
176         struct lu_object  *c_obj;
177         int rc;
178
179         ENTRY;
180
181         c_dev = cml_child_dev(cd);
182         if (c_dev == NULL) {
183                 rc = -ENOENT;
184         } else {
185                 c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
186                                                         lo->lo_header, c_dev);
187                 if (c_obj != NULL) {
188                         lu_object_add(lo, c_obj);
189                         rc = 0;
190                 } else {
191                         rc = -ENOMEM;
192                 }
193         }
194
195         RETURN(rc);
196 }
197
198 static int cml_object_exists(const struct lu_context *ctx,
199                              struct lu_object *lo)
200 {
201         return lu_object_exists(ctx, lu_object_next(lo));
202 }
203
204 static int cml_object_print(const struct lu_context *ctx,
205                             struct seq_file *f, const struct lu_object *lo)
206 {
207         return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
208 }
209
210 static struct lu_object_operations cml_obj_ops = {
211         .loo_object_init    = cml_object_init,
212         .loo_object_free    = cml_object_free,
213         .loo_object_print   = cml_object_print,
214         .loo_object_exists  = cml_object_exists
215 };
216
217 /* CMM local md_object operations */
218 static int cml_object_create(const struct lu_context *ctx,
219                              struct md_object *mo,
220                              struct lu_attr *attr)
221 {
222         int rc;
223         ENTRY;
224         rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
225         RETURN(rc);
226 }
227
228 static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
229                         struct lu_attr *attr)
230 {
231         int rc;
232         ENTRY;
233         rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
234         RETURN(rc);
235 }
236
237 static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
238                         struct lu_attr *attr)
239 {
240         int rc;
241         ENTRY;
242         rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
243         RETURN(rc);
244 }
245
246 static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
247                          void *buf, int buflen, const char *name)
248 {
249         int rc;
250         ENTRY;
251         rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
252                          buf, buflen, name);
253         RETURN(rc);
254 }
255
256 static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
257                          void *buf, int buflen, const char *name)
258 {
259         int rc;
260         ENTRY;
261         rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
262                           buf, buflen, name);
263         RETURN(rc);
264 }
265
266 static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
267 {
268         int rc;
269         ENTRY;
270         rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
271         RETURN(rc);
272 }
273
274 static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
275 {
276         int rc;
277         ENTRY;
278         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
279         RETURN(rc);
280 }
281
282 static int cml_open(const struct lu_context *ctx, struct md_object *mo)
283 {
284         int rc;
285         ENTRY;
286         rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
287         RETURN(rc);
288 }
289
290 static int cml_close(const struct lu_context *ctx, struct md_object *mo)
291 {
292         int rc;
293         ENTRY;
294         rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
295         RETURN(rc);
296 }
297
298 static struct md_object_operations cml_mo_ops = {
299         .moo_attr_get      = cml_attr_get,
300         .moo_attr_set      = cml_attr_set,
301         .moo_xattr_get     = cml_xattr_get,
302         .moo_xattr_set     = cml_xattr_set,
303         .moo_object_create = cml_object_create,
304         .moo_ref_add       = cml_ref_add,
305         .moo_ref_del       = cml_ref_del,
306         .moo_open          = cml_open,
307         .moo_close         = cml_close
308 };
309
310 /* md_dir operations */
311 static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
312                       const char *name, struct lu_fid *lf)
313 {
314         int rc;
315         ENTRY;
316         rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
317         RETURN(rc);
318
319 }
320
321 static int cml_create(const struct lu_context *ctx,
322                       struct md_object *mo_p, const char *name,
323                       struct md_object *mo_c, struct lu_attr *attr)
324 {
325         int rc;
326         ENTRY;
327         rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
328                         cmm2child_obj(md2cmm_obj(mo_c)), attr);
329         RETURN(rc);
330 }
331
332 static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
333                     struct md_object *mo_s, const char *name)
334 {
335         int rc;
336         ENTRY;
337         rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
338                       cmm2child_obj(md2cmm_obj(mo_s)), name);
339         RETURN(rc);
340 }
341
342 static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
343                       struct md_object *mo_c, const char *name)
344 {
345         int rc;
346         ENTRY;
347         rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
348                         cmm2child_obj(md2cmm_obj(mo_c)), name);
349         RETURN(rc);
350 }
351
352 static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
353                        struct md_object *mo_pn, const struct lu_fid *lf,
354                        const char *s_name, struct md_object *mo_t,
355                        const char *t_name)
356 {
357         int rc;
358         ENTRY;
359
360         if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
361                 /* remote object */
362                 rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
363                 if (rc)
364                         RETURN(rc);
365                 mo_t = NULL;
366         }
367
368         rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
369                         cmm2child_obj(md2cmm_obj(mo_pn)), lf, s_name,
370                         cmm2child_obj(md2cmm_obj(mo_t)), t_name);
371
372         RETURN(rc);
373 }
374
375 static int cml_rename_tgt(const struct lu_context *ctx,
376                           struct md_object *mo_p, struct md_object *mo_t,
377                           const struct lu_fid *lf, const char *name)
378 {
379         int rc;
380         ENTRY;
381
382         rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
383                             cmm2child_obj(md2cmm_obj(mo_t)), lf, name);
384         RETURN(rc);
385 }
386
387 static struct md_dir_operations cml_dir_ops = {
388         .mdo_lookup      = cml_lookup,
389         .mdo_create      = cml_create,
390         .mdo_link        = cml_link,
391         .mdo_unlink      = cml_unlink,
392         .mdo_rename      = cml_rename,
393         .mdo_rename_tgt  = cml_rename_tgt,
394 };
395
396 /* -------------------------------------------------------------------
397  * remote CMM object operations. cmr_...
398  */
399 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
400 {
401         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
402 }
403 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
404 {
405         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
406 }
407 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
408 {
409         return container_of0(co, struct cmr_object, cmm_obj);
410 }
411
412 /* get local child device */
413 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
414 {
415         struct lu_device *next = NULL;
416         struct mdc_device *mdc;
417
418         spin_lock(&d->cmm_tgt_guard);
419         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
420                 if (mdc->mc_num == num) {
421                         next = mdc2lu_dev(mdc);
422                         break;
423                 }
424         }
425         spin_unlock(&d->cmm_tgt_guard);
426         return next;
427 }
428
429 /* lu_object operations */
430 static void cmr_object_free(const struct lu_context *ctx,
431                             struct lu_object *lo)
432 {
433         struct cmr_object *cro = lu2cmr_obj(lo);
434         lu_object_fini(lo);
435         OBD_FREE_PTR(cro);
436 }
437
438 static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
439 {
440         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
441         struct lu_device  *c_dev;
442         struct lu_object  *c_obj;
443         int rc;
444
445         ENTRY;
446
447         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
448         if (c_dev == NULL) {
449                 rc = -ENOENT;
450         } else {
451                 c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
452                                                         lo->lo_header, c_dev);
453                 if (c_obj != NULL) {
454                         lu_object_add(lo, c_obj);
455                         rc = 0;
456                 } else {
457                         rc = -ENOMEM;
458                 }
459         }
460
461         RETURN(rc);
462 }
463
464 static int cmr_object_exists(const struct lu_context *ctx,
465                              struct lu_object *lo)
466 {
467         return lu_object_exists(ctx, lu_object_next(lo));
468 }
469
470 static int cmr_object_print(const struct lu_context *ctx,
471                             struct seq_file *f, const struct lu_object *lo)
472 {
473         return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
474 }
475
476 static struct lu_object_operations cmr_obj_ops = {
477         .loo_object_init    = cmr_object_init,
478         .loo_object_free    = cmr_object_free,
479         .loo_object_print   = cmr_object_print,
480         .loo_object_exists  = cmr_object_exists
481 };
482
483 /* CMM remote md_object operations. All are invalid */
484 static int cmr_object_create(const struct lu_context *ctx,
485                              struct md_object *mo,
486                              struct lu_attr *attr)
487 {
488         RETURN(-EFAULT);
489 }
490
491 static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
492                         struct lu_attr *attr)
493 {
494         RETURN(-EREMOTE);
495 }
496
497 static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
498                         struct lu_attr *attr)
499 {
500         RETURN(-EFAULT);
501 }
502
503 static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
504                          void *buf, int buflen, const char *name)
505 {
506         RETURN(-EFAULT);
507 }
508
509 static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
510                          void *buf, int buflen, const char *name)
511 {
512         RETURN(-EFAULT);
513 }
514
515 static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
516 {
517         RETURN(-EFAULT);
518 }
519
520 static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
521 {
522         RETURN(-EFAULT);
523 }
524
525 static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
526 {
527         RETURN(-EREMOTE);
528 }
529
530 static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
531 {
532         RETURN(-EFAULT);
533 }
534
535 static struct md_object_operations cmr_mo_ops = {
536         .moo_attr_get      = cmr_attr_get,
537         .moo_attr_set      = cmr_attr_set,
538         .moo_xattr_get     = cmr_xattr_get,
539         .moo_xattr_set     = cmr_xattr_set,
540         .moo_object_create = cmr_object_create,
541         .moo_ref_add       = cmr_ref_add,
542         .moo_ref_del       = cmr_ref_del,
543         .moo_open          = cmr_open,
544         .moo_close         = cmr_close
545 };
546
547 /* remote part of md_dir operations */
548 static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
549                       const char *name, struct lu_fid *lf)
550 {
551         /*this can happens while rename()
552          * If new parent is remote dir, lookup will happens here */
553
554         RETURN(-EREMOTE);
555 }
556
557 static int cmr_create(const struct lu_context *ctx,
558                       struct md_object *mo_p, const char *name,
559                       struct md_object *mo_c, struct lu_attr *attr)
560 {
561         int rc;
562
563         ENTRY;
564
565         //XXX: make sure that MDT checks name isn't exist
566
567         /* remote object creation and local name insert */
568         rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
569         if (rc == 0) {
570                 rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
571                                      name, lu_object_fid(&mo_c->mo_lu));
572         }
573
574         RETURN(rc);
575 }
576
577 static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
578                     struct md_object *mo_s, const char *name)
579 {
580         int rc;
581         ENTRY;
582
583         //XXX: make sure that MDT checks name isn't exist
584
585         rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
586         if (rc == 0) {
587                 rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
588                                      name, lu_object_fid(&mo_s->mo_lu));
589         }
590
591         RETURN(rc);
592 }
593
594 static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
595                       struct md_object *mo_c, const char *name)
596 {
597         int rc;
598         ENTRY;
599
600         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
601         if (rc == 0) {
602                 rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
603                                      name);
604         }
605
606         RETURN(rc);
607 }
608
609 static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
610                        struct md_object *mo_pn, const struct lu_fid *lf,
611                        const char *s_name, struct md_object *mo_t,
612                        const char *t_name)
613 {
614         int rc;
615         ENTRY;
616
617         /* the mo_pn is remote directory, so we cannot even know if there is
618          * mo_t or not. Therefore mo_t is NULL here but remote server should do
619          * lookup and process this further */
620
621         LASSERT(mo_t == NULL);
622         rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_pn)), NULL/* mo_t */,
623                             lf, t_name);
624         /* only old name is removed localy */
625         if (rc == 0)
626                 rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
627                                      s_name);
628
629         RETURN(rc);
630 }
631
632 static int cmr_rename_tgt(const struct lu_context *ctx,
633                           struct md_object *mo_p, struct md_object *mo_t,
634                           const struct lu_fid *lf, const char *name)
635 {
636         int rc;
637         ENTRY;
638         /* target object is remote one */
639         rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_t)));
640         /* continue locally with name handling only */
641         if (rc == 0)
642                 rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
643                                     NULL, lf, name);
644         RETURN(rc);
645 }
646
647 static struct md_dir_operations cmr_dir_ops = {
648         .mdo_lookup      = cmr_lookup,
649         .mdo_create      = cmr_create,
650         .mdo_link        = cmr_link,
651         .mdo_unlink      = cmr_unlink,
652         .mdo_rename      = cmr_rename,
653         .mdo_rename_tgt  = cmr_rename_tgt,
654 };
655
656