Whamcloud - gitweb
LU-911 obdclass: new context tags for future use
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/obdclass/dt_object.c
39  *
40  * Dt Object.
41  * Generic functions from dt_object.h
42  *
43  * Author: Nikita Danilov <nikita@clusterfs.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_CLASS
47 #ifndef EXPORT_SYMTAB
48 # define EXPORT_SYMTAB
49 #endif
50
51 #include <obd.h>
52 #include <dt_object.h>
53 #include <libcfs/list.h>
54 /* fid_be_to_cpu() */
55 #include <lustre_fid.h>
56
57 struct dt_find_hint {
58         struct lu_fid        *dfh_fid;
59         struct dt_device     *dfh_dt;
60         struct dt_object     *dfh_o;
61 };
62
63 struct dt_thread_info {
64         char                    dti_buf[DT_MAX_PATH];
65         struct dt_find_hint     dti_dfh;
66 };
67
68 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
69 LU_KEY_INIT(dt_global, struct dt_thread_info);
70 LU_KEY_FINI(dt_global, struct dt_thread_info);
71
72 static struct lu_context_key dt_key = {
73         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
74         .lct_init = dt_global_key_init,
75         .lct_fini = dt_global_key_fini
76 };
77
78 /* no lock is necessary to protect the list, because call-backs
79  * are added during system startup. Please refer to "struct dt_device".
80  */
81 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
82 {
83         cfs_list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
84 }
85 EXPORT_SYMBOL(dt_txn_callback_add);
86
87 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
88 {
89         cfs_list_del_init(&cb->dtc_linkage);
90 }
91 EXPORT_SYMBOL(dt_txn_callback_del);
92
93 int dt_txn_hook_start(const struct lu_env *env,
94                       struct dt_device *dev, struct thandle *th)
95 {
96         int rc = 0;
97         struct dt_txn_callback *cb;
98
99         if (th->th_local)
100                 return 0;
101
102         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
103                 if (cb->dtc_txn_start == NULL ||
104                     !(cb->dtc_tag & env->le_ctx.lc_tags))
105                         continue;
106                 rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
107                 if (rc < 0)
108                         break;
109         }
110         return rc;
111 }
112 EXPORT_SYMBOL(dt_txn_hook_start);
113
114 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
115 {
116         struct dt_device       *dev = txn->th_dev;
117         struct dt_txn_callback *cb;
118         int                     rc = 0;
119
120         if (txn->th_local)
121                 return 0;
122
123         cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
124                 if (cb->dtc_txn_stop == NULL ||
125                     !(cb->dtc_tag & env->le_ctx.lc_tags))
126                         continue;
127                 rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
128                 if (rc < 0)
129                         break;
130         }
131         return rc;
132 }
133 EXPORT_SYMBOL(dt_txn_hook_stop);
134
135 void dt_txn_hook_commit(struct thandle *txn)
136 {
137         struct dt_txn_callback *cb;
138
139         if (txn->th_local)
140                 return;
141
142         cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
143                                 dtc_linkage) {
144                 if (cb->dtc_txn_commit)
145                         cb->dtc_txn_commit(txn, cb->dtc_cookie);
146         }
147 }
148 EXPORT_SYMBOL(dt_txn_hook_commit);
149
150 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
151 {
152
153         CFS_INIT_LIST_HEAD(&dev->dd_txn_callbacks);
154         return lu_device_init(&dev->dd_lu_dev, t);
155 }
156 EXPORT_SYMBOL(dt_device_init);
157
158 void dt_device_fini(struct dt_device *dev)
159 {
160         lu_device_fini(&dev->dd_lu_dev);
161 }
162 EXPORT_SYMBOL(dt_device_fini);
163
164 int dt_object_init(struct dt_object *obj,
165                    struct lu_object_header *h, struct lu_device *d)
166
167 {
168         return lu_object_init(&obj->do_lu, h, d);
169 }
170 EXPORT_SYMBOL(dt_object_init);
171
172 void dt_object_fini(struct dt_object *obj)
173 {
174         lu_object_fini(&obj->do_lu);
175 }
176 EXPORT_SYMBOL(dt_object_fini);
177
178 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
179 {
180         if (obj->do_index_ops == NULL)
181                 obj->do_ops->do_index_try(env, obj, &dt_directory_features);
182         return obj->do_index_ops != NULL;
183 }
184 EXPORT_SYMBOL(dt_try_as_dir);
185
186 enum dt_format_type dt_mode_to_dft(__u32 mode)
187 {
188         enum dt_format_type result;
189
190         switch (mode & S_IFMT) {
191         case S_IFDIR:
192                 result = DFT_DIR;
193                 break;
194         case S_IFREG:
195                 result = DFT_REGULAR;
196                 break;
197         case S_IFLNK:
198                 result = DFT_SYM;
199                 break;
200         case S_IFCHR:
201         case S_IFBLK:
202         case S_IFIFO:
203         case S_IFSOCK:
204                 result = DFT_NODE;
205                 break;
206         default:
207                 LBUG();
208                 break;
209         }
210         return result;
211 }
212 EXPORT_SYMBOL(dt_mode_to_dft);
213
214 /**
215  * lookup fid for object named \a name in directory \a dir.
216  */
217
218 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
219                   const char *name, struct lu_fid *fid)
220 {
221         if (dt_try_as_dir(env, dir))
222                 return dt_lookup(env, dir, (struct dt_rec *)fid,
223                                  (const struct dt_key *)name, BYPASS_CAPA);
224         return -ENOTDIR;
225 }
226 EXPORT_SYMBOL(dt_lookup_dir);
227 /**
228  * get object for given \a fid.
229  */
230 struct dt_object *dt_locate(const struct lu_env *env,
231                             struct dt_device *dev,
232                             const struct lu_fid *fid)
233 {
234         struct lu_object *obj;
235         struct dt_object *dt;
236
237         obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL);
238         if (!IS_ERR(obj)) {
239                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
240                 LASSERT(obj != NULL);
241                 dt = container_of(obj, struct dt_object, do_lu);
242         } else
243                 dt = (struct dt_object *)obj;
244         return dt;
245 }
246 EXPORT_SYMBOL(dt_locate);
247
248 /**
249  * find a object named \a entry in given \a dfh->dfh_o directory.
250  */
251 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
252 {
253         struct dt_find_hint  *dfh = data;
254         struct dt_device     *dt = dfh->dfh_dt;
255         struct lu_fid        *fid = dfh->dfh_fid;
256         struct dt_object     *obj = dfh->dfh_o;
257         int                   result;
258
259         result = dt_lookup_dir(env, obj, entry, fid);
260         lu_object_put(env, &obj->do_lu);
261         if (result == 0) {
262                 obj = dt_locate(env, dt, fid);
263                 if (IS_ERR(obj))
264                         result = PTR_ERR(obj);
265         }
266         dfh->dfh_o = obj;
267         return result;
268 }
269
270 /**
271  * Abstract function which parses path name. This function feeds
272  * path component to \a entry_func.
273  */
274 int dt_path_parser(const struct lu_env *env,
275                    char *path, dt_entry_func_t entry_func,
276                    void *data)
277 {
278         char *e;
279         int rc = 0;
280
281         while (1) {
282                 e = strsep(&path, "/");
283                 if (e == NULL)
284                         break;
285
286                 if (e[0] == 0) {
287                         if (!path || path[0] == '\0')
288                                 break;
289                         continue;
290                 }
291                 rc = entry_func(env, e, data);
292                 if (rc)
293                         break;
294         }
295
296         return rc;
297 }
298
299 static struct dt_object *dt_store_resolve(const struct lu_env *env,
300                                           struct dt_device *dt,
301                                           const char *path,
302                                           struct lu_fid *fid)
303 {
304         struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
305                                                          &dt_key);
306         struct dt_find_hint *dfh = &info->dti_dfh;
307         struct dt_object     *obj;
308         char *local = info->dti_buf;
309         int result;
310
311         dfh->dfh_dt = dt;
312         dfh->dfh_fid = fid;
313
314         strncpy(local, path, DT_MAX_PATH);
315         local[DT_MAX_PATH - 1] = '\0';
316
317         result = dt->dd_ops->dt_root_get(env, dt, fid);
318         if (result == 0) {
319                 obj = dt_locate(env, dt, fid);
320                 if (!IS_ERR(obj)) {
321                         dfh->dfh_o = obj;
322                         result = dt_path_parser(env, local, dt_find_entry, dfh);
323                         if (result != 0)
324                                 obj = ERR_PTR(result);
325                         else
326                                 obj = dfh->dfh_o;
327                 }
328         } else {
329                 obj = ERR_PTR(result);
330         }
331         return obj;
332 }
333
334 static struct dt_object *dt_reg_open(const struct lu_env *env,
335                                      struct dt_device *dt,
336                                      struct dt_object *p,
337                                      const char *name,
338                                      struct lu_fid *fid)
339 {
340         struct dt_object *o;
341         int result;
342
343         result = dt_lookup_dir(env, p, name, fid);
344         if (result == 0){
345                 o = dt_locate(env, dt, fid);
346         }
347         else
348                 o = ERR_PTR(result);
349
350         return o;
351 }
352
353 /**
354  * Open dt object named \a filename from \a dirname directory.
355  *      \param  dt      dt device
356  *      \param  fid     on success, object fid is stored in *fid
357  */
358 struct dt_object *dt_store_open(const struct lu_env *env,
359                                 struct dt_device *dt,
360                                 const char *dirname,
361                                 const char *filename,
362                                 struct lu_fid *fid)
363 {
364         struct dt_object *file;
365         struct dt_object *dir;
366
367         dir = dt_store_resolve(env, dt, dirname, fid);
368         if (!IS_ERR(dir)) {
369                 file = dt_reg_open(env, dt, dir,
370                                    filename, fid);
371                 lu_object_put(env, &dir->do_lu);
372         } else {
373                 file = dir;
374         }
375         return file;
376 }
377 EXPORT_SYMBOL(dt_store_open);
378
379 /* dt class init function. */
380 int dt_global_init(void)
381 {
382         int result;
383
384         LU_CONTEXT_KEY_INIT(&dt_key);
385         result = lu_context_key_register(&dt_key);
386         return result;
387 }
388
389 void dt_global_fini(void)
390 {
391         lu_context_key_degister(&dt_key);
392 }
393
394 /**
395  * Generic read helper. May return an error for partial reads.
396  *
397  * \param env  lustre environment
398  * \param dt   object to be read
399  * \param buf  lu_buf to be filled, with buffer pointer and length
400  * \param pos position to start reading, updated as data is read
401  *
402  * \retval real size of data read
403  * \retval -ve errno on failure
404  */
405 int dt_read(const struct lu_env *env, struct dt_object *dt,
406             struct lu_buf *buf, loff_t *pos)
407 {
408         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
409         return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
410 }
411 EXPORT_SYMBOL(dt_read);
412
413 /**
414  * Read structures of fixed size from storage.  Unlike dt_read(), using
415  * dt_record_read() will return an error for partial reads.
416  *
417  * \param env  lustre environment
418  * \param dt   object to be read
419  * \param buf  lu_buf to be filled, with buffer pointer and length
420  * \param pos position to start reading, updated as data is read
421  *
422  * \retval 0 on successfully reading full buffer
423  * \retval -EFAULT on short read
424  * \retval -ve errno on failure
425  */
426 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
427                    struct lu_buf *buf, loff_t *pos)
428 {
429         int rc;
430
431         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
432
433         rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
434
435         if (rc == buf->lb_len)
436                 rc = 0;
437         else if (rc >= 0)
438                 rc = -EFAULT;
439         return rc;
440 }
441 EXPORT_SYMBOL(dt_record_read);
442
443 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
444                     const struct lu_buf *buf, loff_t *pos, struct thandle *th)
445 {
446         int rc;
447
448         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
449         LASSERT(th != NULL);
450         LASSERT(dt->do_body_ops);
451         LASSERT(dt->do_body_ops->dbo_write);
452         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
453         if (rc == buf->lb_len)
454                 rc = 0;
455         else if (rc >= 0)
456                 rc = -EFAULT;
457         return rc;
458 }
459 EXPORT_SYMBOL(dt_record_write);
460
461 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
462                            struct thandle *th)
463 {
464         struct lu_buf vbuf;
465         char *xname = XATTR_NAME_VERSION;
466
467         LASSERT(o);
468         vbuf.lb_buf = NULL;
469         vbuf.lb_len = sizeof(dt_obj_version_t);
470         return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
471
472 }
473 EXPORT_SYMBOL(dt_declare_version_set);
474
475 void dt_version_set(const struct lu_env *env, struct dt_object *o,
476                     dt_obj_version_t version, struct thandle *th)
477 {
478         struct lu_buf vbuf;
479         char *xname = XATTR_NAME_VERSION;
480         int rc;
481
482         LASSERT(o);
483         vbuf.lb_buf = &version;
484         vbuf.lb_len = sizeof(version);
485
486         rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
487         if (rc < 0)
488                 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
489         return;
490 }
491 EXPORT_SYMBOL(dt_version_set);
492
493 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
494 {
495         struct lu_buf vbuf;
496         char *xname = XATTR_NAME_VERSION;
497         dt_obj_version_t version;
498         int rc;
499
500         LASSERT(o);
501         vbuf.lb_buf = &version;
502         vbuf.lb_len = sizeof(version);
503         rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
504         if (rc != sizeof(version)) {
505                 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
506                 version = 0;
507         }
508         return version;
509 }
510 EXPORT_SYMBOL(dt_version_get);
511
512 const struct dt_index_features dt_directory_features;
513 EXPORT_SYMBOL(dt_directory_features);