Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_object.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.tappro@intel.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include "osp_internal.h"
50
51 static __u64 osp_object_assign_id(const struct lu_env *env,
52                                   struct osp_device *d, struct osp_object *o)
53 {
54         struct osp_thread_info  *osi = osp_env_info(env);
55         const struct lu_fid     *f = lu_object_fid(&o->opo_obj.do_lu);
56
57         LASSERT(fid_is_zero(f));
58         LASSERT(o->opo_reserved);
59         o->opo_reserved = 0;
60
61         /* assign fid to anonymous object */
62         osi->osi_oi.oi_id = osp_precreate_get_id(d);
63         osi->osi_oi.oi_seq = FID_SEQ_OST_MDT0;
64         fid_ostid_unpack(&osi->osi_fid, &osi->osi_oi, d->opd_index);
65         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
66
67         return osi->osi_oi.oi_id;
68 }
69
70 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
71                                 const struct lu_attr *attr, struct thandle *th)
72 {
73         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
74         struct osp_object       *o = dt2osp_obj(dt);
75         int                      rc = 0;
76
77         ENTRY;
78
79         /*
80          * Usually we don't allow server stack to manipulate size
81          * but there is a special case when striping is created
82          * late, after stripless file got truncated to non-zero.
83          *
84          * In this case we do the following:
85          *
86          * 1) grab id in declare - this can lead to leaked OST objects
87          *    but we don't currently have proper mechanism and the only
88          *    options we have are to do truncate RPC holding transaction
89          *    open (very bad) or to grab id in declare at cost of leaked
90          *    OST object in same very rare unfortunate case (just bad)
91          *    notice 1.6-2.0 do assignment outside of running transaction
92          *    all the time, meaning many more chances for leaked objects.
93          *
94          * 2) send synchronous truncate RPC with just assigned id
95          */
96
97         /* there are few places in MDD code still passing NULL
98          * XXX: to be fixed soon */
99         if (attr == NULL)
100                 RETURN(0);
101
102         if (attr->la_valid & LA_SIZE && attr->la_size > 0) {
103                 LASSERT(!dt_object_exists(dt));
104                 osp_object_assign_id(env, d, o);
105                 rc = osp_object_truncate(env, dt, attr->la_size);
106                 if (rc)
107                         RETURN(rc);
108         }
109
110         if (o->opo_new) {
111                 /* no need in logging for new objects being created */
112                 RETURN(0);
113         }
114
115         if (!(attr->la_valid & (LA_UID | LA_GID)))
116                 RETURN(0);
117
118         /*
119          * track all UID/GID changes via llog
120          */
121         rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
122
123         RETURN(rc);
124 }
125
126 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
127                         const struct lu_attr *attr, struct thandle *th,
128                         struct lustre_capa *capa)
129 {
130         struct osp_object       *o = dt2osp_obj(dt);
131         int                      rc = 0;
132
133         ENTRY;
134
135         /* we're interested in uid/gid changes only */
136         if (!(attr->la_valid & (LA_UID | LA_GID)))
137                 RETURN(0);
138
139         /* new object, the very first ->attr_set()
140          * initializing attributes needs no logging
141          * all subsequent one are subject to the
142          * logging and synchronization with OST */
143         if (o->opo_new) {
144                 o->opo_new = 0;
145                 RETURN(0);
146         }
147
148         /*
149          * once transaction is committed put proper command on
150          * the queue going to our OST
151          */
152         rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
153
154         /* XXX: send new uid/gid to OST ASAP? */
155
156         RETURN(rc);
157 }
158
159 static int osp_declare_object_create(const struct lu_env *env,
160                                      struct dt_object *dt,
161                                      struct lu_attr *attr,
162                                      struct dt_allocation_hint *hint,
163                                      struct dt_object_format *dof,
164                                      struct thandle *th)
165 {
166         struct osp_thread_info  *osi = osp_env_info(env);
167         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
168         struct osp_object       *o = dt2osp_obj(dt);
169         const struct lu_fid     *fid;
170         int                      rc = 0;
171
172         ENTRY;
173
174         /* should happen to non-0 OSP only so that at least one object
175          * has been already declared in the scenario and LOD should
176          * cleanup that */
177         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
178                 RETURN(-ENOSPC);
179
180         LASSERT(d->opd_last_used_file);
181         fid = lu_object_fid(&dt->do_lu);
182
183         /*
184          * There can be gaps in precreated ids and record to unlink llog
185          * XXX: we do not handle gaps yet, implemented before solution
186          *      was found to be racy, so we disabled that. there is no
187          *      point in making useless but expensive llog declaration.
188          */
189         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
190
191         if (unlikely(!fid_is_zero(fid))) {
192                 /* replay case: caller knows fid */
193                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
194                 rc = dt_declare_record_write(env, d->opd_last_used_file,
195                                              sizeof(osi->osi_id), osi->osi_off,
196                                              th);
197                 RETURN(rc);
198         }
199
200         /*
201          * in declaration we need to reserve object so that we don't block
202          * awaiting precreation RPC to complete
203          */
204         rc = osp_precreate_reserve(env, d);
205         /*
206          * we also need to declare update to local "last used id" file for
207          * recovery if object isn't used for a reason, we need to release
208          * reservation, this can be made in osd_object_release()
209          */
210         if (rc == 0) {
211                 /* mark id is reserved: in create we don't want to talk
212                  * to OST */
213                 LASSERT(o->opo_reserved == 0);
214                 o->opo_reserved = 1;
215
216                 /* common for all OSPs file hystorically */
217                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
218                 rc = dt_declare_record_write(env, d->opd_last_used_file,
219                                              sizeof(osi->osi_id), osi->osi_off,
220                                              th);
221         } else {
222                 /* not needed in the cache anymore */
223                 set_bit(LU_OBJECT_HEARD_BANSHEE,
224                             &dt->do_lu.lo_header->loh_flags);
225         }
226         RETURN(rc);
227 }
228
229 static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
230                              struct lu_attr *attr,
231                              struct dt_allocation_hint *hint,
232                              struct dt_object_format *dof, struct thandle *th)
233 {
234         struct osp_thread_info  *osi = osp_env_info(env);
235         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
236         struct osp_object       *o = dt2osp_obj(dt);
237         int                      rc = 0;
238
239         ENTRY;
240
241         if (o->opo_reserved) {
242                 /* regular case, id is assigned holding transaction open */
243                 osi->osi_id = osp_object_assign_id(env, d, o);
244         } else {
245                 /* special case, id was assigned outside of transaction
246                  * see comments in osp_declare_attr_set */
247                 rc = fid_ostid_pack(lu_object_fid(&dt->do_lu), &osi->osi_oi);
248                 LASSERT(rc == 0);
249                 osi->osi_id = ostid_id(&osi->osi_oi);
250                 spin_lock(&d->opd_pre_lock);
251                 osp_update_last_id(d, osi->osi_id);
252                 spin_unlock(&d->opd_pre_lock);
253         }
254
255         LASSERT(osi->osi_id);
256
257         /*
258          * it's OK if the import is inactive by this moment - id was created
259          * by OST earlier, we just need to maintain it consistently on the disk
260          * once import is reconnected, OSP will claim this and other objects
261          * used and OST either keep them, if they exist or recreate
262          */
263
264         /* we might have lost precreated objects */
265         if (unlikely(d->opd_gap_count) > 0) {
266                 spin_lock(&d->opd_pre_lock);
267                 if (d->opd_gap_count > 0) {
268                         int count = d->opd_gap_count;
269
270                         osi->osi_oi.oi_id = d->opd_gap_start;
271                         d->opd_gap_count = 0;
272                         spin_unlock(&d->opd_pre_lock);
273
274                         CDEBUG(D_HA, "Found gap "LPU64"+%d in objids\n",
275                                d->opd_gap_start, count);
276                         /* real gap handling is disabled intil ORI-692 will be
277                          * fixed, now we only report gaps */
278                 } else {
279                         spin_unlock(&d->opd_pre_lock);
280                 }
281         }
282
283         /* new object, the very first ->attr_set()
284          * initializing attributes needs no logging */
285         o->opo_new = 1;
286
287         osp_objid_buf_prep(osi, d, d->opd_index);
288         rc = dt_record_write(env, d->opd_last_used_file, &osi->osi_lb,
289                              &osi->osi_off, th);
290
291         CDEBUG(D_HA, "%s: Wrote last used ID: "LPU64": %d\n",
292                d->opd_obd->obd_name, le64_to_cpu(d->opd_last_used_id), rc);
293
294         RETURN(rc);
295 }
296
297 static int osp_declare_object_destroy(const struct lu_env *env,
298                                       struct dt_object *dt, struct thandle *th)
299 {
300         struct osp_object       *o = dt2osp_obj(dt);
301         int                      rc = 0;
302
303         ENTRY;
304
305         /*
306          * track objects to be destroyed via llog
307          */
308         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
309
310         RETURN(rc);
311 }
312
313 static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
314                               struct thandle *th)
315 {
316         struct osp_object       *o = dt2osp_obj(dt);
317         int                      rc = 0;
318
319         ENTRY;
320
321         /*
322          * once transaction is committed put proper command on
323          * the queue going to our OST
324          */
325         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
326
327         /* not needed in cache any more */
328         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
329
330         RETURN(rc);
331 }
332
333 struct dt_object_operations osp_obj_ops = {
334         .do_declare_attr_set    = osp_declare_attr_set,
335         .do_attr_set            = osp_attr_set,
336         .do_declare_create      = osp_declare_object_create,
337         .do_create              = osp_object_create,
338         .do_declare_destroy     = osp_declare_object_destroy,
339         .do_destroy             = osp_object_destroy,
340 };
341
342 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
343                            const struct lu_object_conf *unused)
344 {
345         struct osp_object *po = lu2osp_obj(o);
346
347         po->opo_obj.do_ops = &osp_obj_ops;
348
349         return 0;
350 }
351
352 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
353 {
354         struct osp_object       *obj = lu2osp_obj(o);
355         struct lu_object_header *h = o->lo_header;
356
357         dt_object_fini(&obj->opo_obj);
358         lu_object_header_fini(h);
359         OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
360 }
361
362 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
363 {
364         struct osp_object       *po = lu2osp_obj(o);
365         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
366
367         ENTRY;
368
369         /*
370          * release reservation if object was declared but not created
371          * this may require lu_object_put() in LOD
372          */
373         if (unlikely(po->opo_reserved)) {
374                 LASSERT(d->opd_pre_reserved > 0);
375                 spin_lock(&d->opd_pre_lock);
376                 d->opd_pre_reserved--;
377                 spin_unlock(&d->opd_pre_lock);
378
379                 /* not needed in cache any more */
380                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
381         }
382         EXIT;
383 }
384
385 static int osp_object_print(const struct lu_env *env, void *cookie,
386                             lu_printer_t p, const struct lu_object *l)
387 {
388         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
389
390         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
391 }
392
393 static int osp_object_invariant(const struct lu_object *o)
394 {
395         LBUG();
396 }
397
398 struct lu_object_operations osp_lu_obj_ops = {
399         .loo_object_init        = osp_object_init,
400         .loo_object_free        = osp_object_free,
401         .loo_object_release     = osp_object_release,
402         .loo_object_print       = osp_object_print,
403         .loo_object_invariant   = osp_object_invariant
404 };