Whamcloud - gitweb
876554152689584dfd019b5e9f2e4778d78981c9
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/dt_object.c
37  *
38  * Dt Object.
39  * Generic functions from dt_object.h
40  *
41  * Author: Nikita Danilov <nikita@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #include <obd.h>
50 #include <dt_object.h>
51 #include <libcfs/list.h>
52 /* fid_be_to_cpu() */
53 #include <lustre_fid.h>
54
55 struct dt_find_hint {
56         struct lu_fid        *dfh_fid;
57         struct dt_device     *dfh_dt;
58         struct dt_object     *dfh_o;
59 };
60
61 struct dt_thread_info {
62         char                    dti_buf[DT_MAX_PATH];
63         struct dt_find_hint     dti_dfh;
64 };
65
66 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
67 LU_KEY_INIT(dt_global, struct dt_thread_info);
68 LU_KEY_FINI(dt_global, struct dt_thread_info);
69
70 static struct lu_context_key dt_key = {
71         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
72         .lct_init = dt_global_key_init,
73         .lct_fini = dt_global_key_fini
74 };
75
76 /* no lock is necessary to protect the list, because call-backs
77  * are added during system startup. Please refer to "struct dt_device".
78  */
79 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
80 {
81         cfs_list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
82 }
83 EXPORT_SYMBOL(dt_txn_callback_add);
84
85 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
86 {
87         cfs_list_del_init(&cb->dtc_linkage);
88 }
89 EXPORT_SYMBOL(dt_txn_callback_del);
90
91 int dt_txn_hook_start(const struct lu_env *env,
92                       struct dt_device *dev, struct thandle *th)
93 {
94         int rc = 0;
95         struct dt_txn_callback *cb;
96
97         if (th->th_local)
98                 return 0;
99
100         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
101                 if (cb->dtc_txn_start == NULL ||
102                     !(cb->dtc_tag & env->le_ctx.lc_tags))
103                         continue;
104                 rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
105                 if (rc < 0)
106                         break;
107         }
108         return rc;
109 }
110 EXPORT_SYMBOL(dt_txn_hook_start);
111
112 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
113 {
114         struct dt_device       *dev = txn->th_dev;
115         struct dt_txn_callback *cb;
116         int                     rc = 0;
117
118         if (txn->th_local)
119                 return 0;
120
121         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
122                 if (cb->dtc_txn_stop == NULL ||
123                     !(cb->dtc_tag & env->le_ctx.lc_tags))
124                         continue;
125                 rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
126                 if (rc < 0)
127                         break;
128         }
129         return rc;
130 }
131 EXPORT_SYMBOL(dt_txn_hook_stop);
132
133 void dt_txn_hook_commit(struct thandle *txn)
134 {
135         struct dt_txn_callback *cb;
136
137         if (txn->th_local)
138                 return;
139
140         cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
141                                 dtc_linkage) {
142                 if (cb->dtc_txn_commit)
143                         cb->dtc_txn_commit(txn, cb->dtc_cookie);
144         }
145 }
146 EXPORT_SYMBOL(dt_txn_hook_commit);
147
148 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
149 {
150
151         CFS_INIT_LIST_HEAD(&dev->dd_txn_callbacks);
152         return lu_device_init(&dev->dd_lu_dev, t);
153 }
154 EXPORT_SYMBOL(dt_device_init);
155
156 void dt_device_fini(struct dt_device *dev)
157 {
158         lu_device_fini(&dev->dd_lu_dev);
159 }
160 EXPORT_SYMBOL(dt_device_fini);
161
162 int dt_object_init(struct dt_object *obj,
163                    struct lu_object_header *h, struct lu_device *d)
164
165 {
166         return lu_object_init(&obj->do_lu, h, d);
167 }
168 EXPORT_SYMBOL(dt_object_init);
169
170 void dt_object_fini(struct dt_object *obj)
171 {
172         lu_object_fini(&obj->do_lu);
173 }
174 EXPORT_SYMBOL(dt_object_fini);
175
176 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
177 {
178         if (obj->do_index_ops == NULL)
179                 obj->do_ops->do_index_try(env, obj, &dt_directory_features);
180         return obj->do_index_ops != NULL;
181 }
182 EXPORT_SYMBOL(dt_try_as_dir);
183
184 enum dt_format_type dt_mode_to_dft(__u32 mode)
185 {
186         enum dt_format_type result;
187
188         switch (mode & S_IFMT) {
189         case S_IFDIR:
190                 result = DFT_DIR;
191                 break;
192         case S_IFREG:
193                 result = DFT_REGULAR;
194                 break;
195         case S_IFLNK:
196                 result = DFT_SYM;
197                 break;
198         case S_IFCHR:
199         case S_IFBLK:
200         case S_IFIFO:
201         case S_IFSOCK:
202                 result = DFT_NODE;
203                 break;
204         default:
205                 LBUG();
206                 break;
207         }
208         return result;
209 }
210 EXPORT_SYMBOL(dt_mode_to_dft);
211
212 /**
213  * lookup fid for object named \a name in directory \a dir.
214  */
215
216 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
217                   const char *name, struct lu_fid *fid)
218 {
219         if (dt_try_as_dir(env, dir))
220                 return dt_lookup(env, dir, (struct dt_rec *)fid,
221                                  (const struct dt_key *)name, BYPASS_CAPA);
222         return -ENOTDIR;
223 }
224 EXPORT_SYMBOL(dt_lookup_dir);
225 /**
226  * get object for given \a fid.
227  */
228 struct dt_object *dt_locate(const struct lu_env *env,
229                             struct dt_device *dev,
230                             const struct lu_fid *fid)
231 {
232         struct lu_object *obj;
233         struct dt_object *dt;
234
235         obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
236         if (!IS_ERR(obj)) {
237                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
238                 LASSERT(obj != NULL);
239                 dt = container_of(obj, struct dt_object, do_lu);
240         } else
241                 dt = (struct dt_object *)obj;
242         return dt;
243 }
244 EXPORT_SYMBOL(dt_locate);
245
246 /**
247  * find a object named \a entry in given \a dfh->dfh_o directory.
248  */
249 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
250 {
251         struct dt_find_hint  *dfh = data;
252         struct dt_device     *dt = dfh->dfh_dt;
253         struct lu_fid        *fid = dfh->dfh_fid;
254         struct dt_object     *obj = dfh->dfh_o;
255         int                   result;
256
257         result = dt_lookup_dir(env, obj, entry, fid);
258         lu_object_put(env, &obj->do_lu);
259         if (result == 0) {
260                 obj = dt_locate(env, dt, fid);
261                 if (IS_ERR(obj))
262                         result = PTR_ERR(obj);
263         }
264         dfh->dfh_o = obj;
265         return result;
266 }
267
268 /**
269  * Abstract function which parses path name. This function feeds
270  * path component to \a entry_func.
271  */
272 int dt_path_parser(const struct lu_env *env,
273                    char *path, dt_entry_func_t entry_func,
274                    void *data)
275 {
276         char *e;
277         int rc = 0;
278
279         while (1) {
280                 e = strsep(&path, "/");
281                 if (e == NULL)
282                         break;
283
284                 if (e[0] == 0) {
285                         if (!path || path[0] == '\0')
286                                 break;
287                         continue;
288                 }
289                 rc = entry_func(env, e, data);
290                 if (rc)
291                         break;
292         }
293
294         return rc;
295 }
296
297 static struct dt_object *dt_store_resolve(const struct lu_env *env,
298                                           struct dt_device *dt,
299                                           const char *path,
300                                           struct lu_fid *fid)
301 {
302         struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
303                                                          &dt_key);
304         struct dt_find_hint *dfh = &info->dti_dfh;
305         struct dt_object     *obj;
306         char *local = info->dti_buf;
307         int result;
308
309         dfh->dfh_dt = dt;
310         dfh->dfh_fid = fid;
311
312         strncpy(local, path, DT_MAX_PATH);
313         local[DT_MAX_PATH - 1] = '\0';
314
315         result = dt->dd_ops->dt_root_get(env, dt, fid);
316         if (result == 0) {
317                 obj = dt_locate(env, dt, fid);
318                 if (!IS_ERR(obj)) {
319                         dfh->dfh_o = obj;
320                         result = dt_path_parser(env, local, dt_find_entry, dfh);
321                         if (result != 0)
322                                 obj = ERR_PTR(result);
323                         else
324                                 obj = dfh->dfh_o;
325                 }
326         } else {
327                 obj = ERR_PTR(result);
328         }
329         return obj;
330 }
331
332 static struct dt_object *dt_reg_open(const struct lu_env *env,
333                                      struct dt_device *dt,
334                                      struct dt_object *p,
335                                      const char *name,
336                                      struct lu_fid *fid)
337 {
338         struct dt_object *o;
339         int result;
340
341         result = dt_lookup_dir(env, p, name, fid);
342         if (result == 0){
343                 o = dt_locate(env, dt, fid);
344         }
345         else
346                 o = ERR_PTR(result);
347
348         return o;
349 }
350
351 /**
352  * Open dt object named \a filename from \a dirname directory.
353  *      \param  dt      dt device
354  *      \param  fid     on success, object fid is stored in *fid
355  */
356 struct dt_object *dt_store_open(const struct lu_env *env,
357                                 struct dt_device *dt,
358                                 const char *dirname,
359                                 const char *filename,
360                                 struct lu_fid *fid)
361 {
362         struct dt_object *file;
363         struct dt_object *dir;
364
365         dir = dt_store_resolve(env, dt, dirname, fid);
366         if (!IS_ERR(dir)) {
367                 file = dt_reg_open(env, dt, dir,
368                                    filename, fid);
369                 lu_object_put(env, &dir->do_lu);
370         } else {
371                 file = dir;
372         }
373         return file;
374 }
375 EXPORT_SYMBOL(dt_store_open);
376
377 struct dt_object *dt_find_or_create(const struct lu_env *env,
378                                     struct dt_device *dt,
379                                     const struct lu_fid *fid,
380                                     struct dt_object_format *dof,
381                                     struct lu_attr *at)
382 {
383         struct dt_object *dto;
384         struct thandle *th;
385         int rc;
386
387         ENTRY;
388
389         dto = dt_locate(env, dt, fid);
390         if (IS_ERR(dto))
391                 RETURN(dto);
392
393         LASSERT(dto != NULL);
394         if (dt_object_exists(dto))
395                 RETURN(dto);
396
397         th = dt_trans_create(env, dt);
398         if (IS_ERR(th))
399                 GOTO(out, rc = PTR_ERR(th));
400
401         rc = dt_declare_create(env, dto, at, NULL, dof, th);
402         if (rc)
403                 GOTO(trans_stop, rc);
404
405         rc = dt_trans_start_local(env, dt, th);
406         if (rc)
407                 GOTO(trans_stop, rc);
408
409         dt_write_lock(env, dto, 0);
410         if (dt_object_exists(dto))
411                 GOTO(unlock, rc = 0);
412
413         CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid));
414
415         rc = dt_create(env, dto, at, NULL, dof, th);
416         if (rc)
417                 GOTO(unlock, rc);
418         LASSERT(dt_object_exists(dto));
419 unlock:
420         dt_write_unlock(env, dto);
421 trans_stop:
422         dt_trans_stop(env, dt, th);
423 out:
424         if (rc) {
425                 lu_object_put(env, &dto->do_lu);
426                 RETURN(ERR_PTR(rc));
427         }
428         RETURN(dto);
429 }
430 EXPORT_SYMBOL(dt_find_or_create);
431
432 /* dt class init function. */
433 int dt_global_init(void)
434 {
435         int result;
436
437         LU_CONTEXT_KEY_INIT(&dt_key);
438         result = lu_context_key_register(&dt_key);
439         return result;
440 }
441
442 void dt_global_fini(void)
443 {
444         lu_context_key_degister(&dt_key);
445 }
446
447 /**
448  * Generic read helper. May return an error for partial reads.
449  *
450  * \param env  lustre environment
451  * \param dt   object to be read
452  * \param buf  lu_buf to be filled, with buffer pointer and length
453  * \param pos position to start reading, updated as data is read
454  *
455  * \retval real size of data read
456  * \retval -ve errno on failure
457  */
458 int dt_read(const struct lu_env *env, struct dt_object *dt,
459             struct lu_buf *buf, loff_t *pos)
460 {
461         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
462         return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
463 }
464 EXPORT_SYMBOL(dt_read);
465
466 /**
467  * Read structures of fixed size from storage.  Unlike dt_read(), using
468  * dt_record_read() will return an error for partial reads.
469  *
470  * \param env  lustre environment
471  * \param dt   object to be read
472  * \param buf  lu_buf to be filled, with buffer pointer and length
473  * \param pos position to start reading, updated as data is read
474  *
475  * \retval 0 on successfully reading full buffer
476  * \retval -EFAULT on short read
477  * \retval -ve errno on failure
478  */
479 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
480                    struct lu_buf *buf, loff_t *pos)
481 {
482         int rc;
483
484         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
485
486         rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
487
488         if (rc == buf->lb_len)
489                 rc = 0;
490         else if (rc >= 0)
491                 rc = -EFAULT;
492         return rc;
493 }
494 EXPORT_SYMBOL(dt_record_read);
495
496 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
497                     const struct lu_buf *buf, loff_t *pos, struct thandle *th)
498 {
499         int rc;
500
501         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
502         LASSERT(th != NULL);
503         LASSERT(dt->do_body_ops);
504         LASSERT(dt->do_body_ops->dbo_write);
505         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
506         if (rc == buf->lb_len)
507                 rc = 0;
508         else if (rc >= 0)
509                 rc = -EFAULT;
510         return rc;
511 }
512 EXPORT_SYMBOL(dt_record_write);
513
514 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
515                            struct thandle *th)
516 {
517         struct lu_buf vbuf;
518         char *xname = XATTR_NAME_VERSION;
519
520         LASSERT(o);
521         vbuf.lb_buf = NULL;
522         vbuf.lb_len = sizeof(dt_obj_version_t);
523         return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
524
525 }
526 EXPORT_SYMBOL(dt_declare_version_set);
527
528 void dt_version_set(const struct lu_env *env, struct dt_object *o,
529                     dt_obj_version_t version, struct thandle *th)
530 {
531         struct lu_buf vbuf;
532         char *xname = XATTR_NAME_VERSION;
533         int rc;
534
535         LASSERT(o);
536         vbuf.lb_buf = &version;
537         vbuf.lb_len = sizeof(version);
538
539         rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
540         if (rc < 0)
541                 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
542         return;
543 }
544 EXPORT_SYMBOL(dt_version_set);
545
546 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
547 {
548         struct lu_buf vbuf;
549         char *xname = XATTR_NAME_VERSION;
550         dt_obj_version_t version;
551         int rc;
552
553         LASSERT(o);
554         vbuf.lb_buf = &version;
555         vbuf.lb_len = sizeof(version);
556         rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
557         if (rc != sizeof(version)) {
558                 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
559                 version = 0;
560         }
561         return version;
562 }
563 EXPORT_SYMBOL(dt_version_get);
564
565 const struct dt_index_features dt_directory_features;
566 EXPORT_SYMBOL(dt_directory_features);