Whamcloud - gitweb
LU-1302 llog: pass lu_env as parametr in llog functions
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/dt_object.c
37  *
38  * Dt Object.
39  * Generic functions from dt_object.h
40  *
41  * Author: Nikita Danilov <nikita@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <obd.h>
47 #include <dt_object.h>
48 #include <libcfs/list.h>
49 /* fid_be_to_cpu() */
50 #include <lustre_fid.h>
51
52 #include <lquota.h>
53
54 struct dt_find_hint {
55         struct lu_fid        *dfh_fid;
56         struct dt_device     *dfh_dt;
57         struct dt_object     *dfh_o;
58 };
59
60 struct dt_thread_info {
61         char                    dti_buf[DT_MAX_PATH];
62         struct dt_find_hint     dti_dfh;
63 };
64
65 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
66 LU_KEY_INIT(dt_global, struct dt_thread_info);
67 LU_KEY_FINI(dt_global, struct dt_thread_info);
68
69 static struct lu_context_key dt_key = {
70         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
71         .lct_init = dt_global_key_init,
72         .lct_fini = dt_global_key_fini
73 };
74
75 /* no lock is necessary to protect the list, because call-backs
76  * are added during system startup. Please refer to "struct dt_device".
77  */
78 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
79 {
80         cfs_list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
81 }
82 EXPORT_SYMBOL(dt_txn_callback_add);
83
84 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
85 {
86         cfs_list_del_init(&cb->dtc_linkage);
87 }
88 EXPORT_SYMBOL(dt_txn_callback_del);
89
90 int dt_txn_hook_start(const struct lu_env *env,
91                       struct dt_device *dev, struct thandle *th)
92 {
93         int rc = 0;
94         struct dt_txn_callback *cb;
95
96         if (th->th_local)
97                 return 0;
98
99         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
100                 if (cb->dtc_txn_start == NULL ||
101                     !(cb->dtc_tag & env->le_ctx.lc_tags))
102                         continue;
103                 rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
104                 if (rc < 0)
105                         break;
106         }
107         return rc;
108 }
109 EXPORT_SYMBOL(dt_txn_hook_start);
110
111 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
112 {
113         struct dt_device       *dev = txn->th_dev;
114         struct dt_txn_callback *cb;
115         int                     rc = 0;
116
117         if (txn->th_local)
118                 return 0;
119
120         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
121                 if (cb->dtc_txn_stop == NULL ||
122                     !(cb->dtc_tag & env->le_ctx.lc_tags))
123                         continue;
124                 rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
125                 if (rc < 0)
126                         break;
127         }
128         return rc;
129 }
130 EXPORT_SYMBOL(dt_txn_hook_stop);
131
132 void dt_txn_hook_commit(struct thandle *txn)
133 {
134         struct dt_txn_callback *cb;
135
136         if (txn->th_local)
137                 return;
138
139         cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
140                                 dtc_linkage) {
141                 if (cb->dtc_txn_commit)
142                         cb->dtc_txn_commit(txn, cb->dtc_cookie);
143         }
144 }
145 EXPORT_SYMBOL(dt_txn_hook_commit);
146
147 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
148 {
149
150         CFS_INIT_LIST_HEAD(&dev->dd_txn_callbacks);
151         return lu_device_init(&dev->dd_lu_dev, t);
152 }
153 EXPORT_SYMBOL(dt_device_init);
154
155 void dt_device_fini(struct dt_device *dev)
156 {
157         lu_device_fini(&dev->dd_lu_dev);
158 }
159 EXPORT_SYMBOL(dt_device_fini);
160
161 int dt_object_init(struct dt_object *obj,
162                    struct lu_object_header *h, struct lu_device *d)
163
164 {
165         return lu_object_init(&obj->do_lu, h, d);
166 }
167 EXPORT_SYMBOL(dt_object_init);
168
169 void dt_object_fini(struct dt_object *obj)
170 {
171         lu_object_fini(&obj->do_lu);
172 }
173 EXPORT_SYMBOL(dt_object_fini);
174
175 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
176 {
177         if (obj->do_index_ops == NULL)
178                 obj->do_ops->do_index_try(env, obj, &dt_directory_features);
179         return obj->do_index_ops != NULL;
180 }
181 EXPORT_SYMBOL(dt_try_as_dir);
182
183 enum dt_format_type dt_mode_to_dft(__u32 mode)
184 {
185         enum dt_format_type result;
186
187         switch (mode & S_IFMT) {
188         case S_IFDIR:
189                 result = DFT_DIR;
190                 break;
191         case S_IFREG:
192                 result = DFT_REGULAR;
193                 break;
194         case S_IFLNK:
195                 result = DFT_SYM;
196                 break;
197         case S_IFCHR:
198         case S_IFBLK:
199         case S_IFIFO:
200         case S_IFSOCK:
201                 result = DFT_NODE;
202                 break;
203         default:
204                 LBUG();
205                 break;
206         }
207         return result;
208 }
209 EXPORT_SYMBOL(dt_mode_to_dft);
210
211 /**
212  * lookup fid for object named \a name in directory \a dir.
213  */
214
215 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
216                   const char *name, struct lu_fid *fid)
217 {
218         if (dt_try_as_dir(env, dir))
219                 return dt_lookup(env, dir, (struct dt_rec *)fid,
220                                  (const struct dt_key *)name, BYPASS_CAPA);
221         return -ENOTDIR;
222 }
223 EXPORT_SYMBOL(dt_lookup_dir);
224 /**
225  * get object for given \a fid.
226  */
227 struct dt_object *dt_locate(const struct lu_env *env,
228                             struct dt_device *dev,
229                             const struct lu_fid *fid)
230 {
231         struct lu_object *obj;
232         struct dt_object *dt;
233
234         obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
235         if (!IS_ERR(obj)) {
236                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
237                 LASSERT(obj != NULL);
238                 dt = container_of(obj, struct dt_object, do_lu);
239         } else
240                 dt = (struct dt_object *)obj;
241         return dt;
242 }
243 EXPORT_SYMBOL(dt_locate);
244
245 /**
246  * find a object named \a entry in given \a dfh->dfh_o directory.
247  */
248 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
249 {
250         struct dt_find_hint  *dfh = data;
251         struct dt_device     *dt = dfh->dfh_dt;
252         struct lu_fid        *fid = dfh->dfh_fid;
253         struct dt_object     *obj = dfh->dfh_o;
254         int                   result;
255
256         result = dt_lookup_dir(env, obj, entry, fid);
257         lu_object_put(env, &obj->do_lu);
258         if (result == 0) {
259                 obj = dt_locate(env, dt, fid);
260                 if (IS_ERR(obj))
261                         result = PTR_ERR(obj);
262         }
263         dfh->dfh_o = obj;
264         return result;
265 }
266
267 /**
268  * Abstract function which parses path name. This function feeds
269  * path component to \a entry_func.
270  */
271 int dt_path_parser(const struct lu_env *env,
272                    char *path, dt_entry_func_t entry_func,
273                    void *data)
274 {
275         char *e;
276         int rc = 0;
277
278         while (1) {
279                 e = strsep(&path, "/");
280                 if (e == NULL)
281                         break;
282
283                 if (e[0] == 0) {
284                         if (!path || path[0] == '\0')
285                                 break;
286                         continue;
287                 }
288                 rc = entry_func(env, e, data);
289                 if (rc)
290                         break;
291         }
292
293         return rc;
294 }
295
296 static struct dt_object *dt_store_resolve(const struct lu_env *env,
297                                           struct dt_device *dt,
298                                           const char *path,
299                                           struct lu_fid *fid)
300 {
301         struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
302                                                          &dt_key);
303         struct dt_find_hint *dfh = &info->dti_dfh;
304         struct dt_object     *obj;
305         char *local = info->dti_buf;
306         int result;
307
308         dfh->dfh_dt = dt;
309         dfh->dfh_fid = fid;
310
311         strncpy(local, path, DT_MAX_PATH);
312         local[DT_MAX_PATH - 1] = '\0';
313
314         result = dt->dd_ops->dt_root_get(env, dt, fid);
315         if (result == 0) {
316                 obj = dt_locate(env, dt, fid);
317                 if (!IS_ERR(obj)) {
318                         dfh->dfh_o = obj;
319                         result = dt_path_parser(env, local, dt_find_entry, dfh);
320                         if (result != 0)
321                                 obj = ERR_PTR(result);
322                         else
323                                 obj = dfh->dfh_o;
324                 }
325         } else {
326                 obj = ERR_PTR(result);
327         }
328         return obj;
329 }
330
331 static struct dt_object *dt_reg_open(const struct lu_env *env,
332                                      struct dt_device *dt,
333                                      struct dt_object *p,
334                                      const char *name,
335                                      struct lu_fid *fid)
336 {
337         struct dt_object *o;
338         int result;
339
340         result = dt_lookup_dir(env, p, name, fid);
341         if (result == 0){
342                 o = dt_locate(env, dt, fid);
343         }
344         else
345                 o = ERR_PTR(result);
346
347         return o;
348 }
349
350 /**
351  * Open dt object named \a filename from \a dirname directory.
352  *      \param  dt      dt device
353  *      \param  fid     on success, object fid is stored in *fid
354  */
355 struct dt_object *dt_store_open(const struct lu_env *env,
356                                 struct dt_device *dt,
357                                 const char *dirname,
358                                 const char *filename,
359                                 struct lu_fid *fid)
360 {
361         struct dt_object *file;
362         struct dt_object *dir;
363
364         dir = dt_store_resolve(env, dt, dirname, fid);
365         if (!IS_ERR(dir)) {
366                 file = dt_reg_open(env, dt, dir,
367                                    filename, fid);
368                 lu_object_put(env, &dir->do_lu);
369         } else {
370                 file = dir;
371         }
372         return file;
373 }
374 EXPORT_SYMBOL(dt_store_open);
375
376 struct dt_object *dt_find_or_create(const struct lu_env *env,
377                                     struct dt_device *dt,
378                                     const struct lu_fid *fid,
379                                     struct dt_object_format *dof,
380                                     struct lu_attr *at)
381 {
382         struct dt_object *dto;
383         struct thandle *th;
384         int rc;
385
386         ENTRY;
387
388         dto = dt_locate(env, dt, fid);
389         if (IS_ERR(dto))
390                 RETURN(dto);
391
392         LASSERT(dto != NULL);
393         if (dt_object_exists(dto))
394                 RETURN(dto);
395
396         th = dt_trans_create(env, dt);
397         if (IS_ERR(th))
398                 GOTO(out, rc = PTR_ERR(th));
399
400         rc = dt_declare_create(env, dto, at, NULL, dof, th);
401         if (rc)
402                 GOTO(trans_stop, rc);
403
404         rc = dt_trans_start_local(env, dt, th);
405         if (rc)
406                 GOTO(trans_stop, rc);
407
408         dt_write_lock(env, dto, 0);
409         if (dt_object_exists(dto))
410                 GOTO(unlock, rc = 0);
411
412         CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid));
413
414         rc = dt_create(env, dto, at, NULL, dof, th);
415         if (rc)
416                 GOTO(unlock, rc);
417         LASSERT(dt_object_exists(dto));
418 unlock:
419         dt_write_unlock(env, dto);
420 trans_stop:
421         dt_trans_stop(env, dt, th);
422 out:
423         if (rc) {
424                 lu_object_put(env, &dto->do_lu);
425                 RETURN(ERR_PTR(rc));
426         }
427         RETURN(dto);
428 }
429 EXPORT_SYMBOL(dt_find_or_create);
430
431 /* dt class init function. */
432 int dt_global_init(void)
433 {
434         int result;
435
436         LU_CONTEXT_KEY_INIT(&dt_key);
437         result = lu_context_key_register(&dt_key);
438         return result;
439 }
440
441 void dt_global_fini(void)
442 {
443         lu_context_key_degister(&dt_key);
444 }
445
446 /**
447  * Generic read helper. May return an error for partial reads.
448  *
449  * \param env  lustre environment
450  * \param dt   object to be read
451  * \param buf  lu_buf to be filled, with buffer pointer and length
452  * \param pos position to start reading, updated as data is read
453  *
454  * \retval real size of data read
455  * \retval -ve errno on failure
456  */
457 int dt_read(const struct lu_env *env, struct dt_object *dt,
458             struct lu_buf *buf, loff_t *pos)
459 {
460         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
461         return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
462 }
463 EXPORT_SYMBOL(dt_read);
464
465 /**
466  * Read structures of fixed size from storage.  Unlike dt_read(), using
467  * dt_record_read() will return an error for partial reads.
468  *
469  * \param env  lustre environment
470  * \param dt   object to be read
471  * \param buf  lu_buf to be filled, with buffer pointer and length
472  * \param pos position to start reading, updated as data is read
473  *
474  * \retval 0 on successfully reading full buffer
475  * \retval -EFAULT on short read
476  * \retval -ve errno on failure
477  */
478 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
479                    struct lu_buf *buf, loff_t *pos)
480 {
481         int rc;
482
483         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
484
485         rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
486
487         if (rc == buf->lb_len)
488                 rc = 0;
489         else if (rc >= 0)
490                 rc = -EFAULT;
491         return rc;
492 }
493 EXPORT_SYMBOL(dt_record_read);
494
495 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
496                     const struct lu_buf *buf, loff_t *pos, struct thandle *th)
497 {
498         int rc;
499
500         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
501         LASSERT(th != NULL);
502         LASSERT(dt->do_body_ops);
503         LASSERT(dt->do_body_ops->dbo_write);
504         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
505         if (rc == buf->lb_len)
506                 rc = 0;
507         else if (rc >= 0)
508                 rc = -EFAULT;
509         return rc;
510 }
511 EXPORT_SYMBOL(dt_record_write);
512
513 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
514                            struct thandle *th)
515 {
516         struct lu_buf vbuf;
517         char *xname = XATTR_NAME_VERSION;
518
519         LASSERT(o);
520         vbuf.lb_buf = NULL;
521         vbuf.lb_len = sizeof(dt_obj_version_t);
522         return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
523
524 }
525 EXPORT_SYMBOL(dt_declare_version_set);
526
527 void dt_version_set(const struct lu_env *env, struct dt_object *o,
528                     dt_obj_version_t version, struct thandle *th)
529 {
530         struct lu_buf vbuf;
531         char *xname = XATTR_NAME_VERSION;
532         int rc;
533
534         LASSERT(o);
535         vbuf.lb_buf = &version;
536         vbuf.lb_len = sizeof(version);
537
538         rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
539         if (rc < 0)
540                 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
541         return;
542 }
543 EXPORT_SYMBOL(dt_version_set);
544
545 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
546 {
547         struct lu_buf vbuf;
548         char *xname = XATTR_NAME_VERSION;
549         dt_obj_version_t version;
550         int rc;
551
552         LASSERT(o);
553         vbuf.lb_buf = &version;
554         vbuf.lb_len = sizeof(version);
555         rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
556         if (rc != sizeof(version)) {
557                 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
558                 version = 0;
559         }
560         return version;
561 }
562 EXPORT_SYMBOL(dt_version_get);
563
564 /* list of all supported index types */
565
566 /* directories */
567 const struct dt_index_features dt_directory_features;
568 EXPORT_SYMBOL(dt_directory_features);
569
570 /* scrub iterator */
571 const struct dt_index_features dt_otable_features;
572 EXPORT_SYMBOL(dt_otable_features);
573
574 /* accounting indexes */
575 const struct dt_index_features dt_acct_features = {
576         .dif_flags              = DT_IND_UPDATE,
577         .dif_keysize_min        = sizeof(__u64), /* 64-bit uid/gid */
578         .dif_keysize_max        = sizeof(__u64), /* 64-bit uid/gid */
579         .dif_recsize_min        = sizeof(struct acct_rec), /* 32 bytes */
580         .dif_recsize_max        = sizeof(struct acct_rec), /* 32 bytes */
581         .dif_ptrsize            = 4
582 };
583 EXPORT_SYMBOL(dt_acct_features);