Whamcloud - gitweb
LU-1303 lod: propagate real object size on a new striping
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012, Intel, Inc.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <obd.h>
40 #include <obd_class.h>
41 #include <lustre_ver.h>
42 #include <obd_support.h>
43 #include <lprocfs_status.h>
44
45 #include <lustre_fid.h>
46 #include <lustre_param.h>
47 #include <lustre_fid.h>
48 #include <obd_lov.h>
49
50 #include "lod_internal.h"
51
52 extern cfs_mem_cache_t *lod_object_kmem;
53 static const struct dt_body_operations lod_body_lnk_ops;
54
55 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
56                             struct dt_rec *rec, const struct dt_key *key,
57                             struct lustre_capa *capa)
58 {
59         struct dt_object *next = dt_object_child(dt);
60         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
61 }
62
63 static int lod_declare_index_insert(const struct lu_env *env,
64                                     struct dt_object *dt,
65                                     const struct dt_rec *rec,
66                                     const struct dt_key *key,
67                                     struct thandle *handle)
68 {
69         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
70 }
71
72 static int lod_index_insert(const struct lu_env *env,
73                             struct dt_object *dt,
74                             const struct dt_rec *rec,
75                             const struct dt_key *key,
76                             struct thandle *th,
77                             struct lustre_capa *capa,
78                             int ign)
79 {
80         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
81 }
82
83 static int lod_declare_index_delete(const struct lu_env *env,
84                                     struct dt_object *dt,
85                                     const struct dt_key *key,
86                                     struct thandle *th)
87 {
88         return dt_declare_delete(env, dt_object_child(dt), key, th);
89 }
90
91 static int lod_index_delete(const struct lu_env *env,
92                             struct dt_object *dt,
93                             const struct dt_key *key,
94                             struct thandle *th,
95                             struct lustre_capa *capa)
96 {
97         return dt_delete(env, dt_object_child(dt), key, th, capa);
98 }
99
100 static struct dt_it *lod_it_init(const struct lu_env *env,
101                                  struct dt_object *dt, __u32 attr,
102                                  struct lustre_capa *capa)
103 {
104         struct dt_object   *next = dt_object_child(dt);
105
106         return next->do_index_ops->dio_it.init(env, next, attr, capa);
107 }
108
109 static struct dt_index_operations lod_index_ops = {
110         .dio_lookup         = lod_index_lookup,
111         .dio_declare_insert = lod_declare_index_insert,
112         .dio_insert         = lod_index_insert,
113         .dio_declare_delete = lod_declare_index_delete,
114         .dio_delete         = lod_index_delete,
115         .dio_it     = {
116                 .init       = lod_it_init,
117         }
118 };
119
120 static void lod_object_read_lock(const struct lu_env *env,
121                                  struct dt_object *dt, unsigned role)
122 {
123         dt_read_lock(env, dt_object_child(dt), role);
124 }
125
126 static void lod_object_write_lock(const struct lu_env *env,
127                                   struct dt_object *dt, unsigned role)
128 {
129         dt_write_lock(env, dt_object_child(dt), role);
130 }
131
132 static void lod_object_read_unlock(const struct lu_env *env,
133                                    struct dt_object *dt)
134 {
135         dt_read_unlock(env, dt_object_child(dt));
136 }
137
138 static void lod_object_write_unlock(const struct lu_env *env,
139                                     struct dt_object *dt)
140 {
141         dt_write_unlock(env, dt_object_child(dt));
142 }
143
144 static int lod_object_write_locked(const struct lu_env *env,
145                                    struct dt_object *dt)
146 {
147         return dt_write_locked(env, dt_object_child(dt));
148 }
149
150 static int lod_attr_get(const struct lu_env *env,
151                         struct dt_object *dt,
152                         struct lu_attr *attr,
153                         struct lustre_capa *capa)
154 {
155         return dt_attr_get(env, dt_object_child(dt), attr, capa);
156 }
157
158 static int lod_declare_attr_set(const struct lu_env *env,
159                                 struct dt_object *dt,
160                                 const struct lu_attr *attr,
161                                 struct thandle *handle)
162 {
163         struct dt_object  *next = dt_object_child(dt);
164         struct lod_object *lo = lod_dt_obj(dt);
165         int                rc, i;
166         ENTRY;
167
168         /*
169          * declare setattr on the local object
170          */
171         rc = dt_declare_attr_set(env, next, attr, handle);
172         if (rc)
173                 RETURN(rc);
174
175         /*
176          * load striping information, notice we don't do this when object
177          * is being initialized as we don't need this information till
178          * few specific cases like destroy, chown
179          */
180         rc = lod_load_striping(env, lo);
181         if (rc)
182                 RETURN(rc);
183
184         /*
185          * if object is striped declare changes on the stripes
186          */
187         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
188         for (i = 0; i < lo->ldo_stripenr; i++) {
189                 LASSERT(lo->ldo_stripe[i]);
190                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
191                 if (rc) {
192                         CERROR("failed declaration: %d\n", rc);
193                         break;
194                 }
195         }
196
197         RETURN(rc);
198 }
199
200 static int lod_attr_set(const struct lu_env *env,
201                         struct dt_object *dt,
202                         const struct lu_attr *attr,
203                         struct thandle *handle,
204                         struct lustre_capa *capa)
205 {
206         struct dt_object  *next = dt_object_child(dt);
207         struct lod_object *lo = lod_dt_obj(dt);
208         int                rc, i;
209         ENTRY;
210
211         /*
212          * apply changes to the local object
213          */
214         rc = dt_attr_set(env, next, attr, handle, capa);
215         if (rc)
216                 RETURN(rc);
217
218         /*
219          * if object is striped, apply changes to all the stripes
220          */
221         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
222         for (i = 0; i < lo->ldo_stripenr; i++) {
223                 LASSERT(lo->ldo_stripe[i]);
224                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
225                 if (rc) {
226                         CERROR("failed declaration: %d\n", rc);
227                         break;
228                 }
229         }
230
231         RETURN(rc);
232 }
233
234 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
235                          struct lu_buf *buf, const char *name,
236                          struct lustre_capa *capa)
237 {
238         struct lod_thread_info  *info = lod_env_info(env);
239         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
240         int                      rc, is_root;
241         ENTRY;
242
243         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
244         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
245                 RETURN(rc);
246
247         /*
248          * lod returns default striping on the real root of the device
249          * this is like the root stores default striping for the whole
250          * filesystem. historically we've been using a different approach
251          * and store it in the config.
252          */
253         dt_root_get(env, dev->lod_child, &info->lti_fid);
254         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
255
256         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
257                 struct lov_user_md *lum = buf->lb_buf;
258                 struct lov_desc    *desc = &dev->lod_desc;
259
260                 if (buf->lb_buf == NULL) {
261                         rc = sizeof(struct lov_user_md_v1);
262                 } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) {
263                         lum->lmm_magic = LOV_USER_MAGIC_V1;
264                         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
265                         lum->lmm_pattern = desc->ld_pattern;
266                         lum->lmm_stripe_size = desc->ld_default_stripe_size;
267                         lum->lmm_stripe_count = desc->ld_default_stripe_count;
268                         lum->lmm_stripe_offset = desc->ld_default_stripe_offset;
269                         rc = sizeof(struct lov_user_md_v1);
270                 } else {
271                         rc = -ERANGE;
272                 }
273         }
274
275         RETURN(rc);
276 }
277
278 /*
279  * LOV xattr is a storage for striping, and LOD owns this xattr.
280  * but LOD allows others to control striping to some extent
281  * - to reset strping
282  * - to set new defined striping
283  * - to set new semi-defined striping
284  *   - number of stripes is defined
285  *   - number of stripes + osts are defined
286  *   - ??
287  */
288 static int lod_declare_xattr_set(const struct lu_env *env,
289                                  struct dt_object *dt,
290                                  const struct lu_buf *buf,
291                                  const char *name, int fl,
292                                  struct thandle *th)
293 {
294         struct dt_object *next = dt_object_child(dt);
295         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
296         __u32             mode;
297         int               rc;
298         ENTRY;
299
300         /*
301          * allow to declare predefined striping on a new (!mode) object
302          * which is supposed to be replay of regular file creation
303          * (when LOV setting is declared)
304          */
305         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
306         if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV)) {
307                 /*
308                  * this is a request to manipulate object's striping
309                  */
310                 if (dt_object_exists(dt)) {
311                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
312                         if (rc)
313                                 RETURN(rc);
314                 } else {
315                         memset(attr, 0, sizeof(attr));
316                         attr->la_valid = LA_TYPE | LA_MODE;
317                         attr->la_mode = S_IFREG;
318                 }
319                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
320                 if (rc)
321                         RETURN(rc);
322         }
323
324         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
325
326         RETURN(rc);
327 }
328
329 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
330                                     struct dt_object *dt,
331                                     const struct lu_buf *buf,
332                                     const char *name, int fl,
333                                     struct thandle *th,
334                                     struct lustre_capa *capa)
335 {
336         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
337         struct dt_object        *next = dt_object_child(dt);
338         struct lod_object       *l = lod_dt_obj(dt);
339         struct lov_user_md_v1   *lum;
340         struct lov_user_md_v3   *v3 = NULL;
341         int                      rc;
342         ENTRY;
343
344         LASSERT(l->ldo_stripe == NULL);
345         l->ldo_striping_cached = 0;
346         l->ldo_def_striping_set = 0;
347         lod_object_set_pool(l, NULL);
348         l->ldo_def_stripe_size = 0;
349         l->ldo_def_stripenr = 0;
350
351         LASSERT(buf);
352         LASSERT(buf->lb_buf);
353         lum = buf->lb_buf;
354
355         rc = lod_verify_striping(d, buf, 0);
356         if (rc)
357                 RETURN(rc);
358
359         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
360                 v3 = buf->lb_buf;
361
362         /* if { size, offset, count } = { 0, -1, 0 } and no pool
363          * (i.e. all default values specified) then delete default
364          * striping from dir. */
365         CDEBUG(D_OTHER,
366                 "set default striping: sz %u # %u offset %d %s %s\n",
367                 (unsigned)lum->lmm_stripe_size,
368                 (unsigned)lum->lmm_stripe_count,
369                 (int)lum->lmm_stripe_offset,
370                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
371
372         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
373                                 (lum->lmm_stripe_count),
374                                 (lum->lmm_stripe_offset)) &&
375                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
376                 rc = dt_xattr_del(env, next, name, th, capa);
377                 if (rc == -ENODATA)
378                         rc = 0;
379         } else {
380                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
381         }
382
383         RETURN(rc);
384 }
385
386 static int lod_xattr_set(const struct lu_env *env,
387                          struct dt_object *dt, const struct lu_buf *buf,
388                          const char *name, int fl, struct thandle *th,
389                          struct lustre_capa *capa)
390 {
391         struct dt_object *next = dt_object_child(dt);
392         __u32             attr;
393         int               rc;
394         ENTRY;
395
396         attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
397         if (S_ISDIR(attr)) {
398                 if (strncmp(name, XATTR_NAME_LOV, strlen(XATTR_NAME_LOV)) == 0)
399                         rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
400                                                       fl, th, capa);
401                 else
402                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
403
404         } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
405                 /*
406                  * XXX: check striping match what we already have
407                  * during req replay, declare_xattr_set() defines striping,
408                  * then create() does the work
409                  */
410                 rc = lod_striping_create(env, dt, NULL, NULL, th);
411                 RETURN(rc);
412         } else {
413                 /*
414                  * behave transparantly for all other EAs
415                  */
416                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
417         }
418
419         RETURN(rc);
420 }
421
422 static int lod_declare_xattr_del(const struct lu_env *env,
423                                  struct dt_object *dt, const char *name,
424                                  struct thandle *th)
425 {
426         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
427 }
428
429 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
430                          const char *name, struct thandle *th,
431                          struct lustre_capa *capa)
432 {
433         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
434 }
435
436 static int lod_xattr_list(const struct lu_env *env,
437                           struct dt_object *dt, struct lu_buf *buf,
438                           struct lustre_capa *capa)
439 {
440         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
441 }
442
443 int lod_object_set_pool(struct lod_object *o, char *pool)
444 {
445         int len;
446
447         if (o->ldo_pool) {
448                 len = strlen(o->ldo_pool);
449                 OBD_FREE(o->ldo_pool, len + 1);
450                 o->ldo_pool = NULL;
451         }
452         if (pool) {
453                 len = strlen(pool);
454                 OBD_ALLOC(o->ldo_pool, len + 1);
455                 if (o->ldo_pool == NULL)
456                         return -ENOMEM;
457                 strcpy(o->ldo_pool, pool);
458         }
459         return 0;
460 }
461
462 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
463 {
464         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
465 }
466
467 static int lod_cache_parent_striping(const struct lu_env *env,
468                                      struct lod_object *lp)
469 {
470         struct lov_user_md_v1   *v1 = NULL;
471         struct lov_user_md_v3   *v3 = NULL;
472         int                      rc;
473         ENTRY;
474
475         /* dt_ah_init() is called from MDD without parent being write locked
476          * lock it here */
477         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
478         if (lp->ldo_striping_cached)
479                 GOTO(unlock, rc = 0);
480
481         rc = lod_get_lov_ea(env, lp);
482         if (rc < 0)
483                 GOTO(unlock, rc);
484
485         if (rc < sizeof(struct lov_user_md)) {
486                 /* don't lookup for non-existing or invalid striping */
487                 lp->ldo_def_striping_set = 0;
488                 lp->ldo_striping_cached = 1;
489                 lp->ldo_def_stripe_size = 0;
490                 lp->ldo_def_stripenr = 0;
491                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
492                 GOTO(unlock, rc = 0);
493         }
494
495         v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
496         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
497                 lustre_swab_lov_user_md_v1(v1);
498         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
499                 lustre_swab_lov_user_md_v3(v3);
500
501         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
502                 GOTO(unlock, rc = 0);
503
504         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
505                 GOTO(unlock, rc = 0);
506
507         lp->ldo_def_stripenr = v1->lmm_stripe_count;
508         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
509         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
510         lp->ldo_striping_cached = 1;
511         lp->ldo_def_striping_set = 1;
512
513         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
514                 /* XXX: sanity check here */
515                 v3 = (struct lov_user_md_v3 *) v1;
516                 if (v3->lmm_pool_name[0])
517                         lod_object_set_pool(lp, v3->lmm_pool_name);
518         }
519
520         CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
521                lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
522                lp->ldo_def_stripe_offset, v3 ? "from " : "",
523                v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
524
525         EXIT;
526 unlock:
527         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
528         return rc;
529 }
530
531 /**
532  * used to transfer default striping data to the object being created
533  */
534 static void lod_ah_init(const struct lu_env *env,
535                         struct dt_allocation_hint *ah,
536                         struct dt_object *parent,
537                         struct dt_object *child,
538                         cfs_umode_t child_mode)
539 {
540         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
541         struct dt_object  *nextp = NULL;
542         struct dt_object  *nextc;
543         struct lod_object *lp = NULL;
544         struct lod_object *lc;
545         struct lov_desc   *desc;
546         ENTRY;
547
548         LASSERT(child);
549
550         if (likely(parent)) {
551                 nextp = dt_object_child(parent);
552                 lp = lod_dt_obj(parent);
553         }
554
555         nextc = dt_object_child(child);
556         lc = lod_dt_obj(child);
557
558         LASSERT(lc->ldo_stripenr == 0);
559         LASSERT(lc->ldo_stripe == NULL);
560
561         /*
562          * local object may want some hints
563          * in case of late striping creation, ->ah_init()
564          * can be called with local object existing
565          */
566         if (!dt_object_exists(nextc))
567                 nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
568
569         if (S_ISDIR(child_mode)) {
570                 if (lp->ldo_striping_cached == 0) {
571                         /* we haven't tried to get default striping for
572                          * the directory yet, let's cache it in the object */
573                         lod_cache_parent_striping(env, lp);
574                 }
575                 /* transfer defaults to new directory */
576                 if (lp->ldo_striping_cached) {
577                         if (lp->ldo_pool)
578                                 lod_object_set_pool(lc, lp->ldo_pool);
579                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
580                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
581                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
582                         lc->ldo_striping_cached = 1;
583                         lc->ldo_def_striping_set = 1;
584                         CDEBUG(D_OTHER, "inherite striping defaults\n");
585                 }
586                 return;
587         }
588
589         /*
590          * if object is going to be striped over OSTs, transfer default
591          * striping information to the child, so that we can use it
592          * during declaration and creation
593          */
594         if (!lod_object_will_be_striped(S_ISREG(child_mode),
595                                         lu_object_fid(&child->do_lu)))
596                 return;
597
598         /*
599          * try from the parent
600          */
601         if (likely(parent)) {
602                 if (lp->ldo_striping_cached == 0) {
603                         /* we haven't tried to get default striping for
604                          * the directory yet, let's cache it in the object */
605                         lod_cache_parent_striping(env, lp);
606                 }
607
608                 lc->ldo_def_stripe_offset = (__u16) -1;
609
610                 if (lp->ldo_def_striping_set) {
611                         if (lp->ldo_pool)
612                                 lod_object_set_pool(lc, lp->ldo_pool);
613                         lc->ldo_stripenr = lp->ldo_def_stripenr;
614                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
615                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
616                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
617                                lc->ldo_stripenr, lc->ldo_stripe_size,
618                                lp->ldo_pool ? lp->ldo_pool : "");
619                 }
620         }
621
622         /*
623          * if the parent doesn't provide with specific pattern, grab fs-wide one
624          */
625         desc = &d->lod_desc;
626         if (lc->ldo_stripenr == 0)
627                 lc->ldo_stripenr = desc->ld_default_stripe_count;
628         if (lc->ldo_stripe_size == 0)
629                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
630         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
631                lc->ldo_stripenr, lc->ldo_stripe_size,
632                lc->ldo_pool ? lc->ldo_pool : "");
633
634         EXIT;
635 }
636
637 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
638 /*
639  * this function handles a special case when truncate was done
640  * on a stripeless object and now striping is being created
641  * we can't lose that size, so we have to propagate it to newly
642  * created object
643  */
644 static int lod_declare_init_size(const struct lu_env *env,
645                                  struct dt_object *dt, struct thandle *th)
646 {
647         struct dt_object   *next = dt_object_child(dt);
648         struct lod_object  *lo = lod_dt_obj(dt);
649         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
650         uint64_t            size, offs;
651         int                 rc, stripe;
652         ENTRY;
653
654         /* XXX: we support the simplest (RAID0) striping so far */
655         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
656         LASSERT(lo->ldo_stripe_size > 0);
657
658         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
659         LASSERT(attr->la_valid & LA_SIZE);
660         if (rc)
661                 RETURN(rc);
662
663         size = attr->la_size;
664         if (size == 0)
665                 RETURN(0);
666
667         /* ll_do_div64(a, b) returns a % b, and a = a / b */
668         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
669         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
670
671         size = size * lo->ldo_stripe_size;
672         offs = attr->la_size;
673         size += ll_do_div64(offs, lo->ldo_stripe_size);
674
675         attr->la_valid = LA_SIZE;
676         attr->la_size = size;
677
678         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
679
680         RETURN(rc);
681 }
682
683
684 /**
685  * Create declaration of striped object
686  */
687 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
688                                struct lu_attr *attr,
689                                const struct lu_buf *lovea, struct thandle *th)
690 {
691         struct lod_thread_info  *info = lod_env_info(env);
692         struct dt_object        *next = dt_object_child(dt);
693         struct lod_object       *lo = lod_dt_obj(dt);
694         int                      rc;
695         ENTRY;
696
697         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
698                 /* failed to create striping, let's reset
699                  * config so that others don't get confused */
700                 lod_object_free_striping(env, lo);
701                 GOTO(out, rc = -ENOMEM);
702         }
703
704         /* choose OST and generate appropriate objects */
705         rc = lod_qos_prep_create(env, lo, attr, lovea, th);
706         if (rc) {
707                 /* failed to create striping, let's reset
708                  * config so that others don't get confused */
709                 lod_object_free_striping(env, lo);
710                 GOTO(out, rc);
711         }
712
713         /*
714          * declare storage for striping data
715          */
716         info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
717                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
718         rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
719                                   0, th);
720         if (rc)
721                 GOTO(out, rc);
722
723         /*
724          * if striping is created with local object's size > 0,
725          * we have to propagate this size to specific object
726          * the case is possible only when local object was created previously
727          */
728         if (dt_object_exists(next))
729                 rc = lod_declare_init_size(env, dt, th);
730
731 out:
732         RETURN(rc);
733 }
734
735 static int lod_declare_object_create(const struct lu_env *env,
736                                      struct dt_object *dt,
737                                      struct lu_attr *attr,
738                                      struct dt_allocation_hint *hint,
739                                      struct dt_object_format *dof,
740                                      struct thandle *th)
741 {
742         struct dt_object   *next = dt_object_child(dt);
743         struct lod_object  *lo = lod_dt_obj(dt);
744         int                 rc;
745         ENTRY;
746
747         LASSERT(dof);
748         LASSERT(attr);
749         LASSERT(th);
750         LASSERT(!dt_object_exists(next));
751
752         /*
753          * first of all, we declare creation of local object
754          */
755         rc = dt_declare_create(env, next, attr, hint, dof, th);
756         if (rc)
757                 GOTO(out, rc);
758
759         if (dof->dof_type == DFT_SYM)
760                 dt->do_body_ops = &lod_body_lnk_ops;
761
762         /*
763          * it's lod_ah_init() who has decided the object will striped
764          */
765         if (dof->dof_type == DFT_REGULAR) {
766                 /* callers don't want stripes */
767                 /* XXX: all tricky interactions with ->ah_make_hint() decided
768                  * to use striping, then ->declare_create() behaving differently
769                  * should be cleaned */
770                 if (dof->u.dof_reg.striped == 0)
771                         lo->ldo_stripenr = 0;
772                 if (lo->ldo_stripenr > 0)
773                         rc = lod_declare_striped_object(env, dt, attr,
774                                                         NULL, th);
775         } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
776                 struct lod_thread_info *info = lod_env_info(env);
777
778                 info->lti_buf.lb_buf = NULL;
779                 info->lti_buf.lb_len = sizeof(struct lov_user_md_v3);
780                 /* to transfer default striping from the parent */
781                 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
782                                           XATTR_NAME_LOV, 0, th);
783         }
784
785 out:
786         RETURN(rc);
787 }
788
789 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
790                         struct lu_attr *attr, struct dt_object_format *dof,
791                         struct thandle *th)
792 {
793         struct lod_object *lo = lod_dt_obj(dt);
794         int                rc = 0, i;
795         ENTRY;
796
797         LASSERT(lo->ldo_stripe);
798         LASSERT(lo->ldo_stripe > 0);
799         LASSERT(lo->ldo_striping_cached == 0);
800
801         /* create all underlying objects */
802         for (i = 0; i < lo->ldo_stripenr; i++) {
803                 LASSERT(lo->ldo_stripe[i]);
804                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
805
806                 if (rc)
807                         break;
808         }
809         if (rc == 0)
810                 rc = lod_generate_and_set_lovea(env, lo, th);
811
812         RETURN(rc);
813 }
814
815 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
816                              struct lu_attr *attr,
817                              struct dt_allocation_hint *hint,
818                              struct dt_object_format *dof, struct thandle *th)
819 {
820         struct dt_object   *next = dt_object_child(dt);
821         struct lod_object  *lo = lod_dt_obj(dt);
822         int                 rc;
823         ENTRY;
824
825         /* create local object */
826         rc = dt_create(env, next, attr, hint, dof, th);
827
828         if (rc == 0) {
829                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
830                         rc = lod_store_def_striping(env, dt, th);
831                 else if (lo->ldo_stripe)
832                         rc = lod_striping_create(env, dt, attr, dof, th);
833         }
834
835         RETURN(rc);
836 }
837
838 static int lod_declare_object_destroy(const struct lu_env *env,
839                                       struct dt_object *dt,
840                                       struct thandle *th)
841 {
842         struct dt_object   *next = dt_object_child(dt);
843         struct lod_object  *lo = lod_dt_obj(dt);
844         int                 rc, i;
845         ENTRY;
846
847         /*
848          * we declare destroy for the local object
849          */
850         rc = dt_declare_destroy(env, next, th);
851         if (rc)
852                 RETURN(rc);
853
854         /*
855          * load striping information, notice we don't do this when object
856          * is being initialized as we don't need this information till
857          * few specific cases like destroy, chown
858          */
859         rc = lod_load_striping(env, lo);
860         if (rc)
861                 RETURN(rc);
862
863         /* declare destroy for all underlying objects */
864         for (i = 0; i < lo->ldo_stripenr; i++) {
865                 LASSERT(lo->ldo_stripe[i]);
866                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
867
868                 if (rc)
869                         break;
870         }
871
872         RETURN(rc);
873 }
874
875 static int lod_object_destroy(const struct lu_env *env,
876                 struct dt_object *dt, struct thandle *th)
877 {
878         struct dt_object  *next = dt_object_child(dt);
879         struct lod_object *lo = lod_dt_obj(dt);
880         int                rc, i;
881         ENTRY;
882
883         /* destroy local object */
884         rc = dt_destroy(env, next, th);
885         if (rc)
886                 RETURN(rc);
887
888         /* destroy all underlying objects */
889         for (i = 0; i < lo->ldo_stripenr; i++) {
890                 LASSERT(lo->ldo_stripe[i]);
891                 rc = dt_destroy(env, lo->ldo_stripe[i], th);
892                 if (rc)
893                         break;
894         }
895
896         RETURN(rc);
897 }
898
899 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
900                          const struct dt_index_features *feat)
901 {
902         struct dt_object *next = dt_object_child(dt);
903         int               rc;
904         ENTRY;
905
906         LASSERT(next->do_ops);
907         LASSERT(next->do_ops->do_index_try);
908
909         rc = next->do_ops->do_index_try(env, next, feat);
910         if (next->do_index_ops && dt->do_index_ops == NULL) {
911                 dt->do_index_ops = &lod_index_ops;
912                 /* XXX: iterators don't accept device, so bypass LOD */
913                 /* will be fixed with DNE */
914                 if (lod_index_ops.dio_it.fini == NULL) {
915                         lod_index_ops.dio_it = next->do_index_ops->dio_it;
916                         lod_index_ops.dio_it.init = lod_it_init;
917                 }
918         }
919
920         RETURN(rc);
921 }
922
923 static int lod_declare_ref_add(const struct lu_env *env,
924                                struct dt_object *dt, struct thandle *th)
925 {
926         return dt_declare_ref_add(env, dt_object_child(dt), th);
927 }
928
929 static int lod_ref_add(const struct lu_env *env,
930                        struct dt_object *dt, struct thandle *th)
931 {
932         return dt_ref_add(env, dt_object_child(dt), th);
933 }
934
935 static int lod_declare_ref_del(const struct lu_env *env,
936                                struct dt_object *dt, struct thandle *th)
937 {
938         return dt_declare_ref_del(env, dt_object_child(dt), th);
939 }
940
941 static int lod_ref_del(const struct lu_env *env,
942                        struct dt_object *dt, struct thandle *th)
943 {
944         return dt_ref_del(env, dt_object_child(dt), th);
945 }
946
947 static struct obd_capa *lod_capa_get(const struct lu_env *env,
948                                      struct dt_object *dt,
949                                      struct lustre_capa *old, __u64 opc)
950 {
951         return dt_capa_get(env, dt_object_child(dt), old, opc);
952 }
953
954 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
955 {
956         return dt_object_sync(env, dt_object_child(dt));
957 }
958
959 struct dt_object_operations lod_obj_ops = {
960         .do_read_lock           = lod_object_read_lock,
961         .do_write_lock          = lod_object_write_lock,
962         .do_read_unlock         = lod_object_read_unlock,
963         .do_write_unlock        = lod_object_write_unlock,
964         .do_write_locked        = lod_object_write_locked,
965         .do_attr_get            = lod_attr_get,
966         .do_declare_attr_set    = lod_declare_attr_set,
967         .do_attr_set            = lod_attr_set,
968         .do_xattr_get           = lod_xattr_get,
969         .do_declare_xattr_set   = lod_declare_xattr_set,
970         .do_xattr_set           = lod_xattr_set,
971         .do_declare_xattr_del   = lod_declare_xattr_del,
972         .do_xattr_del           = lod_xattr_del,
973         .do_xattr_list          = lod_xattr_list,
974         .do_ah_init             = lod_ah_init,
975         .do_declare_create      = lod_declare_object_create,
976         .do_create              = lod_object_create,
977         .do_declare_destroy     = lod_declare_object_destroy,
978         .do_destroy             = lod_object_destroy,
979         .do_index_try           = lod_index_try,
980         .do_declare_ref_add     = lod_declare_ref_add,
981         .do_ref_add             = lod_ref_add,
982         .do_declare_ref_del     = lod_declare_ref_del,
983         .do_ref_del             = lod_ref_del,
984         .do_capa_get            = lod_capa_get,
985         .do_object_sync         = lod_object_sync,
986 };
987
988 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
989                         struct lu_buf *buf, loff_t *pos,
990                         struct lustre_capa *capa)
991 {
992         struct dt_object *next = dt_object_child(dt);
993         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
994 }
995
996 static ssize_t lod_declare_write(const struct lu_env *env,
997                                  struct dt_object *dt,
998                                  const loff_t size, loff_t pos,
999                                  struct thandle *th)
1000 {
1001         return dt_declare_record_write(env, dt_object_child(dt),
1002                                        size, pos, th);
1003 }
1004
1005 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
1006                          const struct lu_buf *buf, loff_t *pos,
1007                          struct thandle *th, struct lustre_capa *capa, int iq)
1008 {
1009         struct dt_object *next = dt_object_child(dt);
1010         LASSERT(next);
1011         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
1012 }
1013
1014 static const struct dt_body_operations lod_body_lnk_ops = {
1015         .dbo_read               = lod_read,
1016         .dbo_declare_write      = lod_declare_write,
1017         .dbo_write              = lod_write
1018 };
1019
1020 static int lod_object_init(const struct lu_env *env, struct lu_object *o,
1021                            const struct lu_object_conf *conf)
1022 {
1023         struct lod_device *d = lu2lod_dev(o->lo_dev);
1024         struct lu_object  *below;
1025         struct lu_device  *under;
1026         ENTRY;
1027
1028         /*
1029          * create local object
1030          */
1031         under = &d->lod_child->dd_lu_dev;
1032         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
1033         if (below == NULL)
1034                 RETURN(-ENOMEM);
1035
1036         lu_object_add(o, below);
1037
1038         RETURN(0);
1039 }
1040
1041 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
1042 {
1043         int i;
1044
1045         if (lo->ldo_stripe) {
1046                 LASSERT(lo->ldo_stripes_allocated > 0);
1047
1048                 for (i = 0; i < lo->ldo_stripenr; i++) {
1049                         if (lo->ldo_stripe[i])
1050                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
1051                 }
1052
1053                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
1054                 OBD_FREE(lo->ldo_stripe, i);
1055                 lo->ldo_stripe = NULL;
1056                 lo->ldo_stripes_allocated = 0;
1057         }
1058         lo->ldo_stripenr = 0;
1059 }
1060
1061 /*
1062  * ->start is called once all slices are initialized, including header's
1063  * cache for mode (object type). using the type we can initialize ops
1064  */
1065 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
1066 {
1067         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
1068                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
1069         return 0;
1070 }
1071
1072 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
1073 {
1074         struct lod_object *mo = lu2lod_obj(o);
1075
1076         /*
1077          * release all underlying object pinned
1078          */
1079
1080         lod_object_free_striping(env, mo);
1081
1082         lod_object_set_pool(mo, NULL);
1083
1084         lu_object_fini(o);
1085         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
1086 }
1087
1088 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
1089 {
1090         /* XXX: shouldn't we release everything here in case if object
1091          * creation failed before? */
1092 }
1093
1094 static int lod_object_print(const struct lu_env *env, void *cookie,
1095                             lu_printer_t p, const struct lu_object *l)
1096 {
1097         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
1098
1099         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
1100 }
1101
1102 struct lu_object_operations lod_lu_obj_ops = {
1103         .loo_object_init        = lod_object_init,
1104         .loo_object_start       = lod_object_start,
1105         .loo_object_free        = lod_object_free,
1106         .loo_object_release     = lod_object_release,
1107         .loo_object_print       = lod_object_print,
1108 };