Whamcloud - gitweb
LU-1445 osp: Use FID to track precreate cache.
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_object.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.tappro@intel.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include "osp_internal.h"
50
51 static void osp_object_assign_fid(const struct lu_env *env,
52                                  struct osp_device *d, struct osp_object *o)
53 {
54         struct osp_thread_info *osi = osp_env_info(env);
55
56         LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
57         LASSERT(o->opo_reserved);
58         o->opo_reserved = 0;
59
60         osp_precreate_get_fid(env, d, &osi->osi_fid);
61
62         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
63 }
64
65 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
66                                 const struct lu_attr *attr, struct thandle *th)
67 {
68         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
69         struct osp_object       *o = dt2osp_obj(dt);
70         int                      rc = 0;
71
72         ENTRY;
73
74         /*
75          * Usually we don't allow server stack to manipulate size
76          * but there is a special case when striping is created
77          * late, after stripless file got truncated to non-zero.
78          *
79          * In this case we do the following:
80          *
81          * 1) grab id in declare - this can lead to leaked OST objects
82          *    but we don't currently have proper mechanism and the only
83          *    options we have are to do truncate RPC holding transaction
84          *    open (very bad) or to grab id in declare at cost of leaked
85          *    OST object in same very rare unfortunate case (just bad)
86          *    notice 1.6-2.0 do assignment outside of running transaction
87          *    all the time, meaning many more chances for leaked objects.
88          *
89          * 2) send synchronous truncate RPC with just assigned id
90          */
91
92         /* there are few places in MDD code still passing NULL
93          * XXX: to be fixed soon */
94         if (attr == NULL)
95                 RETURN(0);
96
97         if (attr->la_valid & LA_SIZE && attr->la_size > 0) {
98                 LASSERT(!dt_object_exists(dt));
99                 osp_object_assign_fid(env, d, o);
100                 rc = osp_object_truncate(env, dt, attr->la_size);
101                 if (rc)
102                         RETURN(rc);
103         }
104
105         if (o->opo_new) {
106                 /* no need in logging for new objects being created */
107                 RETURN(0);
108         }
109
110         if (!(attr->la_valid & (LA_UID | LA_GID)))
111                 RETURN(0);
112
113         /*
114          * track all UID/GID changes via llog
115          */
116         rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
117
118         RETURN(rc);
119 }
120
121 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
122                         const struct lu_attr *attr, struct thandle *th,
123                         struct lustre_capa *capa)
124 {
125         struct osp_object       *o = dt2osp_obj(dt);
126         int                      rc = 0;
127
128         ENTRY;
129
130         /* we're interested in uid/gid changes only */
131         if (!(attr->la_valid & (LA_UID | LA_GID)))
132                 RETURN(0);
133
134         /* new object, the very first ->attr_set()
135          * initializing attributes needs no logging
136          * all subsequent one are subject to the
137          * logging and synchronization with OST */
138         if (o->opo_new) {
139                 o->opo_new = 0;
140                 RETURN(0);
141         }
142
143         /*
144          * once transaction is committed put proper command on
145          * the queue going to our OST
146          */
147         rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
148
149         /* XXX: send new uid/gid to OST ASAP? */
150
151         RETURN(rc);
152 }
153
154 static int osp_declare_object_create(const struct lu_env *env,
155                                      struct dt_object *dt,
156                                      struct lu_attr *attr,
157                                      struct dt_allocation_hint *hint,
158                                      struct dt_object_format *dof,
159                                      struct thandle *th)
160 {
161         struct osp_thread_info  *osi = osp_env_info(env);
162         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
163         struct osp_object       *o = dt2osp_obj(dt);
164         const struct lu_fid     *fid;
165         int                      rc = 0;
166
167         ENTRY;
168
169         /* should happen to non-0 OSP only so that at least one object
170          * has been already declared in the scenario and LOD should
171          * cleanup that */
172         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
173                 RETURN(-ENOSPC);
174
175         LASSERT(d->opd_last_used_oid_file);
176         fid = lu_object_fid(&dt->do_lu);
177
178         /*
179          * There can be gaps in precreated ids and record to unlink llog
180          * XXX: we do not handle gaps yet, implemented before solution
181          *      was found to be racy, so we disabled that. there is no
182          *      point in making useless but expensive llog declaration.
183          */
184         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
185
186         if (unlikely(!fid_is_zero(fid))) {
187                 /* replay case: caller knows fid */
188                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
189                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
190                                              sizeof(osi->osi_id), osi->osi_off,
191                                              th);
192                 RETURN(rc);
193         }
194
195         /*
196          * in declaration we need to reserve object so that we don't block
197          * awaiting precreation RPC to complete
198          */
199         rc = osp_precreate_reserve(env, d);
200         /*
201          * we also need to declare update to local "last used id" file for
202          * recovery if object isn't used for a reason, we need to release
203          * reservation, this can be made in osd_object_release()
204          */
205         if (rc == 0) {
206                 /* mark id is reserved: in create we don't want to talk
207                  * to OST */
208                 LASSERT(o->opo_reserved == 0);
209                 o->opo_reserved = 1;
210
211                 /* common for all OSPs file hystorically */
212                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
213                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
214                                              sizeof(osi->osi_id), osi->osi_off,
215                                              th);
216         } else {
217                 /* not needed in the cache anymore */
218                 set_bit(LU_OBJECT_HEARD_BANSHEE,
219                             &dt->do_lu.lo_header->loh_flags);
220         }
221         RETURN(rc);
222 }
223
224 static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
225                              struct lu_attr *attr,
226                              struct dt_allocation_hint *hint,
227                              struct dt_object_format *dof, struct thandle *th)
228 {
229         struct osp_thread_info  *osi = osp_env_info(env);
230         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
231         struct osp_object       *o = dt2osp_obj(dt);
232         int                     rc = 0;
233         struct lu_fid           *fid = &osi->osi_fid;
234         ENTRY;
235
236         if (o->opo_reserved) {
237                 /* regular case, fid is assigned holding trunsaction open */
238                  osp_object_assign_fid(env, d, o);
239         }
240
241         memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
242
243         LASSERTF(fid_is_sane(fid), "fid for osp_obj %p is insane"DFID"!\n",
244                  osp_obj, PFID(fid));
245
246         if (!o->opo_reserved) {
247                 /* special case, id was assigned outside of transaction
248                  * see comments in osp_declare_attr_set */
249                 spin_lock(&d->opd_pre_lock);
250                 osp_update_last_fid(d, fid);
251                 spin_unlock(&d->opd_pre_lock);
252         }
253
254         CDEBUG(D_INODE, "fid for osp_obj %p is "DFID"!\n", osp_obj, PFID(fid));
255
256         /*
257          * it's OK if the import is inactive by this moment - id was created
258          * by OST earlier, we just need to maintain it consistently on the disk
259          * once import is reconnected, OSP will claim this and other objects
260          * used and OST either keep them, if they exist or recreate
261          */
262
263         /* we might have lost precreated objects */
264         if (unlikely(d->opd_gap_count) > 0) {
265                 spin_lock(&d->opd_pre_lock);
266                 if (d->opd_gap_count > 0) {
267                         int count = d->opd_gap_count;
268                         osi->osi_oi.oi_id = fid_oid(&d->opd_gap_start_fid);
269                         d->opd_gap_count = 0;
270                         spin_unlock(&d->opd_pre_lock);
271
272                         CDEBUG(D_HA, "Writting gap "DFID"+%d in llog\n",
273                                PFID(&d->opd_gap_start_fid), count);
274                         /* real gap handling is disabled intil ORI-692 will be
275                          * fixed, now we only report gaps */
276                 } else {
277                         spin_unlock(&d->opd_pre_lock);
278                 }
279         }
280
281         /* new object, the very first ->attr_set()
282          * initializing attributes needs no logging */
283         o->opo_new = 1;
284
285         /* Only need update last_used oid file, seq file will only be update
286          * during seq rollover */
287         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
288                            &d->opd_last_used_fid.f_oid, d->opd_index);
289
290         rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
291                              &osi->osi_off, th);
292
293         CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
294                d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
295
296         RETURN(rc);
297 }
298
299 static int osp_declare_object_destroy(const struct lu_env *env,
300                                       struct dt_object *dt, struct thandle *th)
301 {
302         struct osp_object       *o = dt2osp_obj(dt);
303         int                      rc = 0;
304
305         ENTRY;
306
307         /*
308          * track objects to be destroyed via llog
309          */
310         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
311
312         RETURN(rc);
313 }
314
315 static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
316                               struct thandle *th)
317 {
318         struct osp_object       *o = dt2osp_obj(dt);
319         int                      rc = 0;
320
321         ENTRY;
322
323         /*
324          * once transaction is committed put proper command on
325          * the queue going to our OST
326          */
327         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
328
329         /* not needed in cache any more */
330         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
331
332         RETURN(rc);
333 }
334
335 struct dt_object_operations osp_obj_ops = {
336         .do_declare_attr_set    = osp_declare_attr_set,
337         .do_attr_set            = osp_attr_set,
338         .do_declare_create      = osp_declare_object_create,
339         .do_create              = osp_object_create,
340         .do_declare_destroy     = osp_declare_object_destroy,
341         .do_destroy             = osp_object_destroy,
342 };
343
344 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
345                            const struct lu_object_conf *unused)
346 {
347         struct osp_object *po = lu2osp_obj(o);
348
349         po->opo_obj.do_ops = &osp_obj_ops;
350
351         return 0;
352 }
353
354 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
355 {
356         struct osp_object       *obj = lu2osp_obj(o);
357         struct lu_object_header *h = o->lo_header;
358
359         dt_object_fini(&obj->opo_obj);
360         lu_object_header_fini(h);
361         OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
362 }
363
364 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
365 {
366         struct osp_object       *po = lu2osp_obj(o);
367         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
368
369         ENTRY;
370
371         /*
372          * release reservation if object was declared but not created
373          * this may require lu_object_put() in LOD
374          */
375         if (unlikely(po->opo_reserved)) {
376                 LASSERT(d->opd_pre_reserved > 0);
377                 spin_lock(&d->opd_pre_lock);
378                 d->opd_pre_reserved--;
379                 spin_unlock(&d->opd_pre_lock);
380
381                 /* not needed in cache any more */
382                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
383         }
384         EXIT;
385 }
386
387 static int osp_object_print(const struct lu_env *env, void *cookie,
388                             lu_printer_t p, const struct lu_object *l)
389 {
390         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
391
392         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
393 }
394
395 static int osp_object_invariant(const struct lu_object *o)
396 {
397         LBUG();
398 }
399
400 struct lu_object_operations osp_lu_obj_ops = {
401         .loo_object_init        = osp_object_init,
402         .loo_object_free        = osp_object_free,
403         .loo_object_release     = osp_object_release,
404         .loo_object_print       = osp_object_print,
405         .loo_object_invariant   = osp_object_invariant
406 };