Whamcloud - gitweb
LU-5508 osp: RPC adjustment for remote transaction
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_object.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.tappro@intel.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include "osp_internal.h"
47
48 static inline bool is_ost_obj(struct lu_object *lo)
49 {
50         return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
51 }
52
53 static void osp_object_assign_fid(const struct lu_env *env,
54                                   struct osp_device *d, struct osp_object *o)
55 {
56         struct osp_thread_info *osi = osp_env_info(env);
57
58         LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
59         LASSERT(o->opo_reserved);
60         o->opo_reserved = 0;
61
62         osp_precreate_get_fid(env, d, &osi->osi_fid);
63
64         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
65 }
66
67 static int osp_oac_init(struct osp_object *obj)
68 {
69         struct osp_object_attr *ooa;
70
71         OBD_ALLOC_PTR(ooa);
72         if (ooa == NULL)
73                 return -ENOMEM;
74
75         INIT_LIST_HEAD(&ooa->ooa_xattr_list);
76         spin_lock(&obj->opo_lock);
77         if (likely(obj->opo_ooa == NULL)) {
78                 obj->opo_ooa = ooa;
79                 spin_unlock(&obj->opo_lock);
80         } else {
81                 spin_unlock(&obj->opo_lock);
82                 OBD_FREE_PTR(ooa);
83         }
84
85         return 0;
86 }
87
88 static struct osp_xattr_entry *
89 osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
90                           const char *name, size_t namelen)
91 {
92         struct osp_xattr_entry *oxe;
93
94         list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
95                 if (namelen == oxe->oxe_namelen &&
96                     strncmp(name, oxe->oxe_buf, namelen) == 0)
97                         return oxe;
98         }
99
100         return NULL;
101 }
102
103 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
104                                                   const char *name, bool unlink)
105 {
106         struct osp_xattr_entry *oxe = NULL;
107
108         spin_lock(&obj->opo_lock);
109         if (obj->opo_ooa != NULL) {
110                 oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
111                                                 strlen(name));
112                 if (oxe != NULL) {
113                         if (unlink)
114                                 list_del_init(&oxe->oxe_list);
115                         else
116                                 atomic_inc(&oxe->oxe_ref);
117                 }
118         }
119         spin_unlock(&obj->opo_lock);
120
121         return oxe;
122 }
123
124 static struct osp_xattr_entry *
125 osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
126 {
127         struct osp_object_attr *ooa     = obj->opo_ooa;
128         struct osp_xattr_entry *oxe;
129         struct osp_xattr_entry *tmp     = NULL;
130         size_t                  namelen = strlen(name);
131         size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
132
133         LASSERT(ooa != NULL);
134
135         oxe = osp_oac_xattr_find(obj, name, false);
136         if (oxe != NULL)
137                 return oxe;
138
139         OBD_ALLOC(oxe, size);
140         if (unlikely(oxe == NULL))
141                 return NULL;
142
143         INIT_LIST_HEAD(&oxe->oxe_list);
144         oxe->oxe_buflen = size;
145         oxe->oxe_namelen = namelen;
146         memcpy(oxe->oxe_buf, name, namelen);
147         oxe->oxe_value = oxe->oxe_buf + namelen + 1;
148         /* One ref is for the caller, the other is for the entry on the list. */
149         atomic_set(&oxe->oxe_ref, 2);
150
151         spin_lock(&obj->opo_lock);
152         tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
153         if (tmp == NULL)
154                 list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
155         else
156                 atomic_inc(&tmp->oxe_ref);
157         spin_unlock(&obj->opo_lock);
158
159         if (tmp != NULL) {
160                 OBD_FREE(oxe, size);
161                 oxe = tmp;
162         }
163
164         return oxe;
165 }
166
167 static struct osp_xattr_entry *
168 osp_oac_xattr_replace(struct osp_object *obj,
169                       struct osp_xattr_entry **poxe, size_t len)
170 {
171         struct osp_object_attr *ooa     = obj->opo_ooa;
172         struct osp_xattr_entry *oxe;
173         size_t                  namelen = (*poxe)->oxe_namelen;
174         size_t                  size    = sizeof(*oxe) + namelen + 1 + len;
175
176         LASSERT(ooa != NULL);
177
178         OBD_ALLOC(oxe, size);
179         if (unlikely(oxe == NULL))
180                 return NULL;
181
182         INIT_LIST_HEAD(&oxe->oxe_list);
183         oxe->oxe_buflen = size;
184         oxe->oxe_namelen = namelen;
185         memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
186         oxe->oxe_value = oxe->oxe_buf + namelen + 1;
187         /* One ref is for the caller, the other is for the entry on the list. */
188         atomic_set(&oxe->oxe_ref, 2);
189
190         spin_lock(&obj->opo_lock);
191         *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
192         LASSERT(*poxe != NULL);
193
194         list_del_init(&(*poxe)->oxe_list);
195         list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
196         spin_unlock(&obj->opo_lock);
197
198         return oxe;
199 }
200
201 static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
202 {
203         if (atomic_dec_and_test(&oxe->oxe_ref)) {
204                 LASSERT(list_empty(&oxe->oxe_list));
205
206                 OBD_FREE(oxe, oxe->oxe_buflen);
207         }
208 }
209
210 static int osp_get_attr_from_reply(const struct lu_env *env,
211                                    struct object_update_reply *reply,
212                                    struct ptlrpc_request *req,
213                                    struct lu_attr *attr,
214                                    struct osp_object *obj, int index)
215 {
216         struct osp_thread_info  *osi    = osp_env_info(env);
217         struct lu_buf           *rbuf   = &osi->osi_lb2;
218         struct obdo             *lobdo  = &osi->osi_obdo;
219         struct obdo             *wobdo;
220         int                     rc;
221
222         rc = object_update_result_data_get(reply, rbuf, index);
223         if (rc < 0)
224                 return rc;
225
226         wobdo = rbuf->lb_buf;
227         if (rbuf->lb_len != sizeof(*wobdo))
228                 return -EPROTO;
229
230         LASSERT(req != NULL);
231         if (ptlrpc_req_need_swab(req))
232                 lustre_swab_obdo(wobdo);
233
234         lustre_get_wire_obdo(NULL, lobdo, wobdo);
235         spin_lock(&obj->opo_lock);
236         if (obj->opo_ooa != NULL) {
237                 la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid);
238                 if (attr != NULL)
239                         *attr = obj->opo_ooa->ooa_attr;
240         } else {
241                 LASSERT(attr != NULL);
242
243                 la_from_obdo(attr, lobdo, lobdo->o_valid);
244         }
245         spin_unlock(&obj->opo_lock);
246
247         return 0;
248 }
249
250 static int osp_attr_get_interpterer(const struct lu_env *env,
251                                     struct object_update_reply *reply,
252                                     struct ptlrpc_request *req,
253                                     struct osp_object *obj,
254                                     void *data, int index, int rc)
255 {
256         struct lu_attr *attr = data;
257
258         LASSERT(obj->opo_ooa != NULL);
259
260         if (rc == 0) {
261                 osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
262                 obj->opo_non_exist = 0;
263
264                 return osp_get_attr_from_reply(env, reply, req, NULL, obj,
265                                                index);
266         } else {
267                 if (rc == -ENOENT) {
268                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
269                         obj->opo_non_exist = 1;
270                 }
271
272                 spin_lock(&obj->opo_lock);
273                 attr->la_valid = 0;
274                 spin_unlock(&obj->opo_lock);
275         }
276
277         return 0;
278 }
279
280 static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt,
281                                 struct lustre_capa *capa)
282 {
283         struct osp_object       *obj    = dt2osp_obj(dt);
284         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
285         int                      rc     = 0;
286
287         if (obj->opo_ooa == NULL) {
288                 rc = osp_oac_init(obj);
289                 if (rc != 0)
290                         return rc;
291         }
292
293         mutex_lock(&osp->opd_async_requests_mutex);
294         rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
295                                       &obj->opo_ooa->ooa_attr,
296                                       osp_attr_get_interpterer);
297         mutex_unlock(&osp->opd_async_requests_mutex);
298
299         return rc;
300 }
301
302 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
303                  struct lu_attr *attr, struct lustre_capa *capa)
304 {
305         struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
306         struct osp_object               *obj = dt2osp_obj(dt);
307         struct dt_device                *dev = &osp->opd_dt_dev;
308         struct dt_update_request        *update;
309         struct object_update_reply      *reply;
310         struct ptlrpc_request           *req = NULL;
311         int                             rc = 0;
312         ENTRY;
313
314         if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
315                 RETURN(-ENOENT);
316
317         if (obj->opo_ooa != NULL) {
318                 spin_lock(&obj->opo_lock);
319                 if (obj->opo_ooa->ooa_attr.la_valid != 0) {
320                         *attr = obj->opo_ooa->ooa_attr;
321                         spin_unlock(&obj->opo_lock);
322
323                         RETURN(0);
324                 }
325                 spin_unlock(&obj->opo_lock);
326         }
327
328         update = out_create_update_req(dev);
329         if (IS_ERR(update))
330                 RETURN(PTR_ERR(update));
331
332         rc = out_insert_update(env, update, OUT_ATTR_GET,
333                                lu_object_fid(&dt->do_lu), 0, NULL, NULL);
334         if (rc != 0) {
335                 CERROR("%s: Insert update error "DFID": rc = %d\n",
336                        dev->dd_lu_dev.ld_obd->obd_name,
337                        PFID(lu_object_fid(&dt->do_lu)), rc);
338
339                 GOTO(out, rc);
340         }
341
342         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
343         if (rc != 0) {
344                 if (rc == -ENOENT) {
345                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
346                         obj->opo_non_exist = 1;
347                 } else {
348                         CERROR("%s:osp_attr_get update error "DFID": rc = %d\n",
349                                dev->dd_lu_dev.ld_obd->obd_name,
350                                PFID(lu_object_fid(&dt->do_lu)), rc);
351                 }
352
353                 GOTO(out, rc);
354         }
355
356         osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
357         obj->opo_non_exist = 0;
358         reply = req_capsule_server_sized_get(&req->rq_pill,
359                                              &RMF_OUT_UPDATE_REPLY,
360                                              OUT_UPDATE_REPLY_SIZE);
361         if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
362                 GOTO(out, rc = -EPROTO);
363
364         rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0);
365         if (rc != 0)
366                 GOTO(out, rc);
367
368         GOTO(out, rc = 0);
369
370 out:
371         if (req != NULL)
372                 ptlrpc_req_finished(req);
373
374         out_destroy_update_req(update);
375
376         return rc;
377 }
378
379 static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
380                           const struct lu_attr *attr, struct thandle *th)
381 {
382         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
383         struct osp_object       *o = dt2osp_obj(dt);
384         struct lu_attr          *la;
385         int                      rc = 0;
386         ENTRY;
387
388         /*
389          * Usually we don't allow server stack to manipulate size
390          * but there is a special case when striping is created
391          * late, after stripless file got truncated to non-zero.
392          *
393          * In this case we do the following:
394          *
395          * 1) grab id in declare - this can lead to leaked OST objects
396          *    but we don't currently have proper mechanism and the only
397          *    options we have are to do truncate RPC holding transaction
398          *    open (very bad) or to grab id in declare at cost of leaked
399          *    OST object in same very rare unfortunate case (just bad)
400          *    notice 1.6-2.0 do assignment outside of running transaction
401          *    all the time, meaning many more chances for leaked objects.
402          *
403          * 2) send synchronous truncate RPC with just assigned id
404          */
405
406         /* there are few places in MDD code still passing NULL
407          * XXX: to be fixed soon */
408         if (attr == NULL)
409                 RETURN(0);
410
411         if (attr->la_valid & LA_SIZE && attr->la_size > 0 &&
412             fid_is_zero(lu_object_fid(&o->opo_obj.do_lu))) {
413                 LASSERT(!dt_object_exists(dt));
414                 osp_object_assign_fid(env, d, o);
415                 rc = osp_object_truncate(env, dt, attr->la_size);
416                 if (rc)
417                         RETURN(rc);
418         }
419
420         if (o->opo_new)
421                 /* no need in logging for new objects being created */
422                 RETURN(0);
423
424         if (!(attr->la_valid & (LA_UID | LA_GID)))
425                 RETURN(0);
426
427         if (!is_only_remote_trans(th))
428                 /*
429                  * track all UID/GID changes via llog
430                  */
431                 rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
432         else
433                 /* It is for OST-object attr_set directly without updating
434                  * local MDT-object attribute. It is usually used by LFSCK. */
435                 rc = osp_md_declare_attr_set(env, dt, attr, th);
436
437         if (rc != 0 || o->opo_ooa == NULL)
438                 RETURN(rc);
439
440         /* Update the OSP object attributes cache. */
441         la = &o->opo_ooa->ooa_attr;
442         spin_lock(&o->opo_lock);
443         if (attr->la_valid & LA_UID) {
444                 la->la_uid = attr->la_uid;
445                 la->la_valid |= LA_UID;
446         }
447
448         if (attr->la_valid & LA_GID) {
449                 la->la_gid = attr->la_gid;
450                 la->la_valid |= LA_GID;
451         }
452         spin_unlock(&o->opo_lock);
453
454         RETURN(0);
455 }
456
457 /**
458  * XXX: NOT prepare set_{attr,xattr} RPC for remote transaction.
459  *
460  * According to our current transaction/dt_object_lock framework (to make
461  * the cross-MDTs modification for DNE1 to be workable), the transaction
462  * sponsor will start the transaction firstly, then try to acquire related
463  * dt_object_lock if needed. Under such rules, if we want to prepare the
464  * set_{attr,xattr} RPC in the RPC declare phase, then related attr/xattr
465  * should be known without dt_object_lock. But such condition maybe not
466  * true for some remote transaction case. For example:
467  *
468  * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
469  * the dt_object_lock on the target MDT-object, it cannot know whether
470  * the MDT-object has linkEA or not, neither invalid or not.
471  *
472  * Since the LFSCK thread cannot hold dt_object_lock before the (remote)
473  * transaction start (otherwise there will be some potential deadlock),
474  * it cannot prepare related RPC for repairing during the declare phase
475  * as other normal transactions do.
476  *
477  * To resolve the trouble, we will make OSP to prepare related RPC
478  * (set_attr/set_xattr/del_xattr) after remote transaction started,
479  * and trigger the remote updating (RPC sending) when trans_stop.
480  * Then the up layer users, such as LFSCK, can follow the general
481  * rule to handle trans_start/dt_object_lock for repairing linkEA
482  * inconsistency without distinguishing remote MDT-object.
483  *
484  * In fact, above solution for remote transaction should be the normal
485  * model without considering DNE1. The trouble brought by DNE1 will be
486  * resolved in DNE2. At that time, this patch can be removed.
487  */
488 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
489                                 const struct lu_attr *attr, struct thandle *th)
490 {
491         int rc = 0;
492
493         if (!is_only_remote_trans(th))
494                 rc = __osp_attr_set(env, dt, attr, th);
495
496         return rc;
497 }
498
499 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
500                         const struct lu_attr *attr, struct thandle *th,
501                         struct lustre_capa *capa)
502 {
503         struct osp_object       *o = dt2osp_obj(dt);
504         int                      rc = 0;
505         ENTRY;
506
507         if (is_only_remote_trans(th)) {
508                 rc = __osp_attr_set(env, dt, attr, th);
509                 if (rc != 0)
510                         RETURN(rc);
511         }
512
513         /* we're interested in uid/gid changes only */
514         if (!(attr->la_valid & (LA_UID | LA_GID)))
515                 RETURN(0);
516
517         /* new object, the very first ->attr_set()
518          * initializing attributes needs no logging
519          * all subsequent one are subject to the
520          * logging and synchronization with OST */
521         if (o->opo_new) {
522                 o->opo_new = 0;
523                 RETURN(0);
524         }
525
526         if (!is_only_remote_trans(th))
527                 /*
528                  * once transaction is committed put proper command on
529                  * the queue going to our OST
530                  */
531                 rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
532                 /* XXX: send new uid/gid to OST ASAP? */
533         else
534                 /* It is for OST-object attr_set directly without updating
535                  * local MDT-object attribute. It is usually used by LFSCK. */
536                 rc = osp_md_attr_set(env, dt, attr, th, capa);
537
538         RETURN(rc);
539 }
540
541 static int osp_xattr_get_interpterer(const struct lu_env *env,
542                                      struct object_update_reply *reply,
543                                      struct ptlrpc_request *req,
544                                      struct osp_object *obj,
545                                      void *data, int index, int rc)
546 {
547         struct osp_object_attr  *ooa  = obj->opo_ooa;
548         struct osp_xattr_entry  *oxe  = data;
549         struct lu_buf           *rbuf = &osp_env_info(env)->osi_lb2;
550
551         LASSERT(ooa != NULL);
552
553         if (rc == 0) {
554                 size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1;
555
556                 rc = object_update_result_data_get(reply, rbuf, index);
557                 if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
558                         spin_lock(&obj->opo_lock);
559                         oxe->oxe_ready = 0;
560                         spin_unlock(&obj->opo_lock);
561                         osp_oac_xattr_put(oxe);
562
563                         return rc < 0 ? rc : -ERANGE;
564                 }
565
566                 spin_lock(&obj->opo_lock);
567                 oxe->oxe_vallen = rbuf->lb_len;
568                 memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
569                 oxe->oxe_exist = 1;
570                 oxe->oxe_ready = 1;
571                 spin_unlock(&obj->opo_lock);
572         } else if (rc == -ENOENT || rc == -ENODATA) {
573                 spin_lock(&obj->opo_lock);
574                 oxe->oxe_exist = 0;
575                 oxe->oxe_ready = 1;
576                 spin_unlock(&obj->opo_lock);
577         } else {
578                 spin_lock(&obj->opo_lock);
579                 oxe->oxe_ready = 0;
580                 spin_unlock(&obj->opo_lock);
581         }
582
583         osp_oac_xattr_put(oxe);
584
585         return 0;
586 }
587
588 static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
589                                  struct lu_buf *buf, const char *name,
590                                  struct lustre_capa *capa)
591 {
592         struct osp_object       *obj     = dt2osp_obj(dt);
593         struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
594         struct osp_xattr_entry  *oxe;
595         int                      namelen = strlen(name);
596         int                      rc      = 0;
597
598         LASSERT(buf != NULL);
599         LASSERT(name != NULL);
600
601         /* If only for xattr size, return directly. */
602         if (unlikely(buf->lb_len == 0))
603                 return 0;
604
605         if (obj->opo_ooa == NULL) {
606                 rc = osp_oac_init(obj);
607                 if (rc != 0)
608                         return rc;
609         }
610
611         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
612         if (oxe == NULL)
613                 return -ENOMEM;
614
615         mutex_lock(&osp->opd_async_requests_mutex);
616         rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
617                                       &namelen, &name, oxe,
618                                       osp_xattr_get_interpterer);
619         if (rc != 0) {
620                 mutex_unlock(&osp->opd_async_requests_mutex);
621                 osp_oac_xattr_put(oxe);
622         } else {
623                 struct dt_update_request *update;
624
625                 /* XXX: Currently, we trigger the batched async OUT
626                  *      RPC via dt_declare_xattr_get(). It is not
627                  *      perfect solution, but works well now.
628                  *
629                  *      We will improve it in the future. */
630                 update = osp->opd_async_requests;
631                 if (update != NULL && update->dur_req != NULL &&
632                     update->dur_req->ourq_count > 0) {
633                         osp->opd_async_requests = NULL;
634                         mutex_unlock(&osp->opd_async_requests_mutex);
635                         rc = osp_unplug_async_request(env, osp, update);
636                 } else {
637                         mutex_unlock(&osp->opd_async_requests_mutex);
638                 }
639         }
640
641         return rc;
642 }
643
644 int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
645                   struct lu_buf *buf, const char *name,
646                   struct lustre_capa *capa)
647 {
648         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
649         struct osp_object       *obj    = dt2osp_obj(dt);
650         struct dt_device        *dev    = &osp->opd_dt_dev;
651         struct lu_buf           *rbuf   = &osp_env_info(env)->osi_lb2;
652         struct dt_update_request *update = NULL;
653         struct ptlrpc_request   *req    = NULL;
654         struct object_update_reply *reply;
655         struct osp_xattr_entry  *oxe    = NULL;
656         const char              *dname  = dt->do_lu.lo_dev->ld_obd->obd_name;
657         int                      namelen;
658         int                      rc     = 0;
659         ENTRY;
660
661         LASSERT(buf != NULL);
662         LASSERT(name != NULL);
663
664         if (unlikely(obj->opo_non_exist))
665                 RETURN(-ENOENT);
666
667         oxe = osp_oac_xattr_find(obj, name, false);
668         if (oxe != NULL) {
669                 spin_lock(&obj->opo_lock);
670                 if (oxe->oxe_ready) {
671                         if (!oxe->oxe_exist)
672                                 GOTO(unlock, rc = -ENODATA);
673
674                         if (buf->lb_buf == NULL)
675                                 GOTO(unlock, rc = oxe->oxe_vallen);
676
677                         if (buf->lb_len < oxe->oxe_vallen)
678                                 GOTO(unlock, rc = -ERANGE);
679
680                         memcpy(buf->lb_buf, oxe->oxe_value, oxe->oxe_vallen);
681
682                         GOTO(unlock, rc = oxe->oxe_vallen);
683
684 unlock:
685                         spin_unlock(&obj->opo_lock);
686                         osp_oac_xattr_put(oxe);
687
688                         return rc;
689                 }
690                 spin_unlock(&obj->opo_lock);
691         }
692
693         update = out_create_update_req(dev);
694         if (IS_ERR(update))
695                 GOTO(out, rc = PTR_ERR(update));
696
697         namelen = strlen(name) + 1;
698         rc = out_insert_update(env, update, OUT_XATTR_GET,
699                                lu_object_fid(&dt->do_lu), 1, &namelen, &name);
700         if (rc != 0) {
701                 CERROR("%s: Insert update error "DFID": rc = %d\n",
702                        dname, PFID(lu_object_fid(&dt->do_lu)), rc);
703
704                 GOTO(out, rc);
705         }
706
707         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
708         if (rc != 0) {
709                 if (obj->opo_ooa == NULL)
710                         GOTO(out, rc);
711
712                 if (oxe == NULL)
713                         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
714
715                 if (oxe == NULL) {
716                         CWARN("%s: Fail to add xattr (%s) to cache for "
717                               DFID" (1): rc = %d\n", dname, name,
718                               PFID(lu_object_fid(&dt->do_lu)), rc);
719
720                         GOTO(out, rc);
721                 }
722
723                 spin_lock(&obj->opo_lock);
724                 if (rc == -ENOENT || rc == -ENODATA) {
725                         oxe->oxe_exist = 0;
726                         oxe->oxe_ready = 1;
727                 } else {
728                         oxe->oxe_ready = 0;
729                 }
730                 spin_unlock(&obj->opo_lock);
731
732                 GOTO(out, rc);
733         }
734
735         reply = req_capsule_server_sized_get(&req->rq_pill,
736                                              &RMF_OUT_UPDATE_REPLY,
737                                              OUT_UPDATE_REPLY_SIZE);
738         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
739                 CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
740                        dname, reply->ourp_magic, UPDATE_REPLY_MAGIC,
741                        PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
742
743                 GOTO(out, rc = -EPROTO);
744         }
745
746         rc = object_update_result_data_get(reply, rbuf, 0);
747         if (rc < 0)
748                 GOTO(out, rc);
749
750         if (buf->lb_buf == NULL)
751                 GOTO(out, rc = rbuf->lb_len);
752
753         if (unlikely(buf->lb_len < rbuf->lb_len))
754                 GOTO(out, rc = -ERANGE);
755
756         memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
757         rc = rbuf->lb_len;
758         if (obj->opo_ooa == NULL)
759                 GOTO(out, rc);
760
761         if (oxe == NULL) {
762                 oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
763                 if (oxe == NULL) {
764                         CWARN("%s: Fail to add xattr (%s) to "
765                               "cache for "DFID" (2): rc = %d\n",
766                               dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
767
768                         GOTO(out, rc);
769                 }
770         }
771
772         if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < rbuf->lb_len) {
773                 struct osp_xattr_entry *old = oxe;
774                 struct osp_xattr_entry *tmp;
775
776                 tmp = osp_oac_xattr_replace(obj, &old, rbuf->lb_len);
777                 osp_oac_xattr_put(oxe);
778                 oxe = tmp;
779                 if (tmp == NULL) {
780                         CWARN("%s: Fail to update xattr (%s) to "
781                               "cache for "DFID": rc = %d\n",
782                               dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
783                         spin_lock(&obj->opo_lock);
784                         old->oxe_ready = 0;
785                         spin_unlock(&obj->opo_lock);
786
787                         GOTO(out, rc);
788                 }
789
790                 /* Drop the ref for entry on list. */
791                 osp_oac_xattr_put(old);
792         }
793
794         spin_lock(&obj->opo_lock);
795         oxe->oxe_vallen = rbuf->lb_len;
796         memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
797         oxe->oxe_exist = 1;
798         oxe->oxe_ready = 1;
799         spin_unlock(&obj->opo_lock);
800
801         GOTO(out, rc);
802
803 out:
804         if (req != NULL)
805                 ptlrpc_req_finished(req);
806
807         if (update != NULL && !IS_ERR(update))
808                 out_destroy_update_req(update);
809
810         if (oxe != NULL)
811                 osp_oac_xattr_put(oxe);
812
813         return rc;
814 }
815
816 static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
817                            const struct lu_buf *buf, const char *name,
818                            int flag, struct thandle *th)
819 {
820         struct dt_update_request *update;
821         struct lu_fid            *fid;
822         int                      sizes[3]       = { strlen(name),
823                                                     buf->lb_len,
824                                                     sizeof(int) };
825         char                     *bufs[3]       = { (char *)name,
826                                                     (char *)buf->lb_buf };
827         struct osp_xattr_entry   *oxe;
828         struct osp_object        *o             = dt2osp_obj(dt);
829         int                      rc;
830         ENTRY;
831
832         LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
833
834         update = out_find_create_update_loc(th, dt);
835         if (IS_ERR(update)) {
836                 CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n",
837                        dt->do_lu.lo_dev->ld_obd->obd_name,
838                        PFID(lu_object_fid(&dt->do_lu)),
839                        (int)PTR_ERR(update));
840
841                 RETURN(PTR_ERR(update));
842         }
843
844         flag = cpu_to_le32(flag);
845         bufs[2] = (char *)&flag;
846
847         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
848         rc = out_insert_update(env, update, OUT_XATTR_SET, fid,
849                                ARRAY_SIZE(sizes), sizes, (const char **)bufs);
850         if (rc != 0 || o->opo_ooa == NULL)
851                 RETURN(rc);
852
853         oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
854         if (oxe == NULL) {
855                 CWARN("%s: Fail to add xattr (%s) to cache for "DFID,
856                       dt->do_lu.lo_dev->ld_obd->obd_name,
857                       name, PFID(lu_object_fid(&dt->do_lu)));
858
859                 RETURN(0);
860         }
861
862         if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
863                 struct osp_xattr_entry *old = oxe;
864                 struct osp_xattr_entry *tmp;
865
866                 tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
867                 osp_oac_xattr_put(oxe);
868                 oxe = tmp;
869                 if (tmp == NULL) {
870                         CWARN("%s: Fail to update xattr (%s) to cache for "DFID,
871                               dt->do_lu.lo_dev->ld_obd->obd_name,
872                               name, PFID(lu_object_fid(&dt->do_lu)));
873                         spin_lock(&o->opo_lock);
874                         old->oxe_ready = 0;
875                         spin_unlock(&o->opo_lock);
876
877                         RETURN(0);
878                 }
879
880                 /* Drop the ref for entry on list. */
881                 osp_oac_xattr_put(old);
882         }
883
884         spin_lock(&o->opo_lock);
885         oxe->oxe_vallen = buf->lb_len;
886         memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
887         oxe->oxe_exist = 1;
888         oxe->oxe_ready = 1;
889         spin_unlock(&o->opo_lock);
890         osp_oac_xattr_put(oxe);
891
892         RETURN(0);
893 }
894
895 int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
896                           const struct lu_buf *buf, const char *name,
897                           int flag, struct thandle *th)
898 {
899         int rc = 0;
900
901         /* Please check the comment in osp_attr_set() for handling
902          * remote transaction. */
903         if (!is_only_remote_trans(th))
904                 rc = __osp_xattr_set(env, dt, buf, name, flag, th);
905
906         return rc;
907 }
908
909 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
910                   const struct lu_buf *buf, const char *name, int fl,
911                   struct thandle *th, struct lustre_capa *capa)
912 {
913         int rc = 0;
914
915         CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
916                PFID(&dt->do_lu.lo_header->loh_fid));
917
918         /* Please check the comment in osp_attr_set() for handling
919          * remote transaction. */
920         if (is_only_remote_trans(th))
921                 rc = __osp_xattr_set(env, dt, buf, name, fl, th);
922
923         return rc;
924 }
925
926 static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
927                            const char *name, struct thandle *th)
928 {
929         struct dt_update_request *update;
930         const struct lu_fid      *fid;
931         struct osp_object        *o     = dt2osp_obj(dt);
932         struct osp_xattr_entry   *oxe;
933         int                       size  = strlen(name);
934         int                       rc;
935
936         update = out_find_create_update_loc(th, dt);
937         if (IS_ERR(update))
938                 return PTR_ERR(update);
939
940         fid = lu_object_fid(&dt->do_lu);
941
942         rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size,
943                                (const char **)&name);
944         if (rc != 0 || o->opo_ooa == NULL)
945                 return rc;
946
947         oxe = osp_oac_xattr_find(o, name, true);
948         if (oxe != NULL)
949                 /* Drop the ref for entry on list. */
950                 osp_oac_xattr_put(oxe);
951
952         return 0;
953 }
954
955 int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
956                           const char *name, struct thandle *th)
957 {
958         int rc = 0;
959
960         /* Please check the comment in osp_attr_set() for handling
961          * remote transaction. */
962         if (!is_only_remote_trans(th))
963                 rc = __osp_xattr_del(env, dt, name, th);
964
965         return rc;
966 }
967
968 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
969                   const char *name, struct thandle *th,
970                   struct lustre_capa *capa)
971 {
972         int rc = 0;
973
974         CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name,
975                PFID(&dt->do_lu.lo_header->loh_fid));
976
977         /* Please check the comment in osp_attr_set() for handling
978          * remote transaction. */
979         if (is_only_remote_trans(th))
980                 rc = __osp_xattr_del(env, dt, name, th);
981
982         return rc;
983 }
984
985 static int osp_declare_object_create(const struct lu_env *env,
986                                      struct dt_object *dt,
987                                      struct lu_attr *attr,
988                                      struct dt_allocation_hint *hint,
989                                      struct dt_object_format *dof,
990                                      struct thandle *th)
991 {
992         struct osp_thread_info  *osi = osp_env_info(env);
993         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
994         struct osp_object       *o = dt2osp_obj(dt);
995         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
996         int                      rc = 0;
997
998         ENTRY;
999
1000         if (is_only_remote_trans(th)) {
1001                 LASSERT(fid_is_sane(fid));
1002
1003                 rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th);
1004
1005                 RETURN(rc);
1006         }
1007
1008         /* should happen to non-0 OSP only so that at least one object
1009          * has been already declared in the scenario and LOD should
1010          * cleanup that */
1011         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
1012                 RETURN(-ENOSPC);
1013
1014         LASSERT(d->opd_last_used_oid_file);
1015
1016         /*
1017          * There can be gaps in precreated ids and record to unlink llog
1018          * XXX: we do not handle gaps yet, implemented before solution
1019          *      was found to be racy, so we disabled that. there is no
1020          *      point in making useless but expensive llog declaration.
1021          */
1022         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
1023
1024         if (unlikely(!fid_is_zero(fid))) {
1025                 /* replay case: caller knows fid */
1026                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
1027                 osi->osi_lb.lb_len = sizeof(osi->osi_id);
1028                 osi->osi_lb.lb_buf = NULL;
1029                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1030                                              &osi->osi_lb, osi->osi_off, th);
1031                 RETURN(rc);
1032         }
1033
1034         /*
1035          * in declaration we need to reserve object so that we don't block
1036          * awaiting precreation RPC to complete
1037          */
1038         rc = osp_precreate_reserve(env, d);
1039         /*
1040          * we also need to declare update to local "last used id" file for
1041          * recovery if object isn't used for a reason, we need to release
1042          * reservation, this can be made in osd_object_release()
1043          */
1044         if (rc == 0) {
1045                 /* mark id is reserved: in create we don't want to talk
1046                  * to OST */
1047                 LASSERT(o->opo_reserved == 0);
1048                 o->opo_reserved = 1;
1049
1050                 /* common for all OSPs file hystorically */
1051                 osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
1052                 osi->osi_lb.lb_len = sizeof(osi->osi_id);
1053                 osi->osi_lb.lb_buf = NULL;
1054                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1055                                              &osi->osi_lb, osi->osi_off, th);
1056         } else {
1057                 /* not needed in the cache anymore */
1058                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1059                             &dt->do_lu.lo_header->loh_flags);
1060         }
1061         RETURN(rc);
1062 }
1063
1064 static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
1065                              struct lu_attr *attr,
1066                              struct dt_allocation_hint *hint,
1067                              struct dt_object_format *dof, struct thandle *th)
1068 {
1069         struct osp_thread_info  *osi = osp_env_info(env);
1070         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
1071         struct osp_object       *o = dt2osp_obj(dt);
1072         int                     rc = 0;
1073         struct lu_fid           *fid = &osi->osi_fid;
1074         ENTRY;
1075
1076         if (is_only_remote_trans(th)) {
1077                 LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu)));
1078
1079                 rc = osp_md_object_create(env, dt, attr, hint, dof, th);
1080                 if (rc == 0)
1081                         o->opo_non_exist = 0;
1082
1083                 RETURN(rc);
1084         }
1085
1086         o->opo_non_exist = 0;
1087         if (o->opo_reserved) {
1088                 /* regular case, fid is assigned holding trunsaction open */
1089                  osp_object_assign_fid(env, d, o);
1090         }
1091
1092         memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
1093
1094         LASSERTF(fid_is_sane(fid), "fid for osp_object %p is insane"DFID"!\n",
1095                  o, PFID(fid));
1096
1097         if (!o->opo_reserved) {
1098                 /* special case, id was assigned outside of transaction
1099                  * see comments in osp_declare_attr_set */
1100                 LASSERT(d->opd_pre != NULL);
1101                 spin_lock(&d->opd_pre_lock);
1102                 osp_update_last_fid(d, fid);
1103                 spin_unlock(&d->opd_pre_lock);
1104         }
1105
1106         CDEBUG(D_INODE, "fid for osp_object %p is "DFID"\n", o, PFID(fid));
1107
1108         /* If the precreate ends, it means it will be ready to rollover to
1109          * the new sequence soon, all the creation should be synchronized,
1110          * otherwise during replay, the replay fid will be inconsistent with
1111          * last_used/create fid */
1112         if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
1113                 th->th_sync = 1;
1114
1115         /*
1116          * it's OK if the import is inactive by this moment - id was created
1117          * by OST earlier, we just need to maintain it consistently on the disk
1118          * once import is reconnected, OSP will claim this and other objects
1119          * used and OST either keep them, if they exist or recreate
1120          */
1121
1122         /* we might have lost precreated objects */
1123         if (unlikely(d->opd_gap_count) > 0) {
1124                 LASSERT(d->opd_pre != NULL);
1125                 spin_lock(&d->opd_pre_lock);
1126                 if (d->opd_gap_count > 0) {
1127                         int count = d->opd_gap_count;
1128
1129                         ostid_set_id(&osi->osi_oi,
1130                                      fid_oid(&d->opd_gap_start_fid));
1131                         d->opd_gap_count = 0;
1132                         spin_unlock(&d->opd_pre_lock);
1133
1134                         CDEBUG(D_HA, "Writting gap "DFID"+%d in llog\n",
1135                                PFID(&d->opd_gap_start_fid), count);
1136                         /* real gap handling is disabled intil ORI-692 will be
1137                          * fixed, now we only report gaps */
1138                 } else {
1139                         spin_unlock(&d->opd_pre_lock);
1140                 }
1141         }
1142
1143         /* new object, the very first ->attr_set()
1144          * initializing attributes needs no logging */
1145         o->opo_new = 1;
1146
1147         /* Only need update last_used oid file, seq file will only be update
1148          * during seq rollover */
1149         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
1150                            &d->opd_last_used_fid.f_oid, d->opd_index);
1151
1152         rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
1153                              &osi->osi_off, th);
1154
1155         CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
1156                d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
1157
1158         RETURN(rc);
1159 }
1160
1161 int osp_declare_object_destroy(const struct lu_env *env,
1162                                struct dt_object *dt, struct thandle *th)
1163 {
1164         struct osp_object       *o = dt2osp_obj(dt);
1165         int                      rc = 0;
1166
1167         ENTRY;
1168
1169         /*
1170          * track objects to be destroyed via llog
1171          */
1172         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1173
1174         RETURN(rc);
1175 }
1176
1177 int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
1178                        struct thandle *th)
1179 {
1180         struct osp_object       *o = dt2osp_obj(dt);
1181         int                      rc = 0;
1182
1183         ENTRY;
1184
1185         o->opo_non_exist = 1;
1186         /*
1187          * once transaction is committed put proper command on
1188          * the queue going to our OST
1189          */
1190         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1191
1192         /* not needed in cache any more */
1193         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1194
1195         RETURN(rc);
1196 }
1197
1198 static int osp_orphan_index_lookup(const struct lu_env *env,
1199                                    struct dt_object *dt,
1200                                    struct dt_rec *rec,
1201                                    const struct dt_key *key,
1202                                    struct lustre_capa *capa)
1203 {
1204         return -EOPNOTSUPP;
1205 }
1206
1207 static int osp_orphan_index_declare_insert(const struct lu_env *env,
1208                                            struct dt_object *dt,
1209                                            const struct dt_rec *rec,
1210                                            const struct dt_key *key,
1211                                            struct thandle *handle)
1212 {
1213         return -EOPNOTSUPP;
1214 }
1215
1216 static int osp_orphan_index_insert(const struct lu_env *env,
1217                                    struct dt_object *dt,
1218                                    const struct dt_rec *rec,
1219                                    const struct dt_key *key,
1220                                    struct thandle *handle,
1221                                    struct lustre_capa *capa,
1222                                    int ignore_quota)
1223 {
1224         return -EOPNOTSUPP;
1225 }
1226
1227 static int osp_orphan_index_declare_delete(const struct lu_env *env,
1228                                            struct dt_object *dt,
1229                                            const struct dt_key *key,
1230                                            struct thandle *handle)
1231 {
1232         return -EOPNOTSUPP;
1233 }
1234
1235 static int osp_orphan_index_delete(const struct lu_env *env,
1236                                    struct dt_object *dt,
1237                                    const struct dt_key *key,
1238                                    struct thandle *handle,
1239                                    struct lustre_capa *capa)
1240 {
1241         return -EOPNOTSUPP;
1242 }
1243
1244 struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
1245                           __u32 attr, struct lustre_capa *capa)
1246 {
1247         struct osp_it *it;
1248
1249         OBD_ALLOC_PTR(it);
1250         if (it == NULL)
1251                 return ERR_PTR(-ENOMEM);
1252
1253         it->ooi_pos_ent = -1;
1254         it->ooi_obj = dt;
1255
1256         return (struct dt_it *)it;
1257 }
1258
1259 void osp_it_fini(const struct lu_env *env, struct dt_it *di)
1260 {
1261         struct osp_it   *it = (struct osp_it *)di;
1262         struct page     **pages = it->ooi_pages;
1263         int             npages = it->ooi_total_npages;
1264         int             i;
1265
1266         if (pages != NULL) {
1267                 for (i = 0; i < npages; i++) {
1268                         if (pages[i] != NULL) {
1269                                 if (pages[i] == it->ooi_cur_page) {
1270                                         kunmap(pages[i]);
1271                                         it->ooi_cur_page = NULL;
1272                                 }
1273                                 __free_page(pages[i]);
1274                         }
1275                 }
1276                 OBD_FREE(pages, npages * sizeof(*pages));
1277         }
1278         OBD_FREE_PTR(it);
1279 }
1280
1281 static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
1282 {
1283         struct lu_device         *dev   = it->ooi_obj->do_lu.lo_dev;
1284         struct osp_device        *osp   = lu2osp_dev(dev);
1285         struct page             **pages;
1286         struct ptlrpc_request    *req   = NULL;
1287         struct ptlrpc_bulk_desc  *desc;
1288         struct idx_info          *ii;
1289         int                       npages;
1290         int                       rc;
1291         int                       i;
1292         ENTRY;
1293
1294         /* 1MB bulk */
1295         npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20);
1296         npages /= PAGE_CACHE_SIZE;
1297
1298         OBD_ALLOC(pages, npages * sizeof(*pages));
1299         if (pages == NULL)
1300                 RETURN(-ENOMEM);
1301
1302         it->ooi_pages = pages;
1303         it->ooi_total_npages = npages;
1304         for (i = 0; i < npages; i++) {
1305                 pages[i] = alloc_page(GFP_IOFS);
1306                 if (pages[i] == NULL)
1307                         RETURN(-ENOMEM);
1308         }
1309
1310         req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import,
1311                                    &RQF_OBD_IDX_READ);
1312         if (req == NULL)
1313                 RETURN(-ENOMEM);
1314
1315         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
1316         if (rc != 0) {
1317                 ptlrpc_request_free(req);
1318                 RETURN(rc);
1319         }
1320
1321         req->rq_request_portal = OUT_PORTAL;
1322         ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
1323         memset(ii, 0, sizeof(*ii));
1324         if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) {
1325                 /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE,
1326                  * ost_index, 0] with LAST_ID FID, so it needs to replace
1327                  * the FID with orphan FID here */
1328                 ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE;
1329                 ii->ii_fid.f_oid = osp->opd_index;
1330                 ii->ii_fid.f_ver = 0;
1331                 ii->ii_flags = II_FL_NOHASH;
1332         } else {
1333                 ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu);
1334                 ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY |
1335                                II_FL_VARREC;
1336         }
1337         ii->ii_magic = IDX_INFO_MAGIC;
1338         ii->ii_count = npages * LU_PAGE_COUNT;
1339         ii->ii_hash_start = it->ooi_next;
1340         ii->ii_attrs =
1341                 osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
1342
1343         ptlrpc_at_set_req_timeout(req);
1344
1345         desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
1346                                     MDS_BULK_PORTAL);
1347         if (desc == NULL) {
1348                 ptlrpc_request_free(req);
1349                 RETURN(-ENOMEM);
1350         }
1351
1352         for (i = 0; i < npages; i++)
1353                 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1354
1355         ptlrpc_request_set_replen(req);
1356         rc = ptlrpc_queue_wait(req);
1357         if (rc != 0)
1358                 GOTO(out, rc);
1359
1360         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1361                                           req->rq_bulk->bd_nob_transferred);
1362         if (rc < 0)
1363                 GOTO(out, rc);
1364         rc = 0;
1365
1366         ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
1367         if (ii->ii_magic != IDX_INFO_MAGIC)
1368                  GOTO(out, rc = -EPROTO);
1369
1370         npages = (ii->ii_count + LU_PAGE_COUNT - 1) >>
1371                  (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT);
1372         if (npages > it->ooi_total_npages) {
1373                 CERROR("%s: returned more pages than expected, %u > %u\n",
1374                        osp->opd_obd->obd_name, npages, it->ooi_total_npages);
1375                 GOTO(out, rc = -EINVAL);
1376         }
1377
1378         it->ooi_valid_npages = npages;
1379         if (ptlrpc_rep_need_swab(req))
1380                 it->ooi_swab = 1;
1381
1382         it->ooi_next = ii->ii_hash_end;
1383
1384 out:
1385         ptlrpc_req_finished(req);
1386
1387         return rc;
1388 }
1389
1390 int osp_it_next_page(const struct lu_env *env, struct dt_it *di)
1391 {
1392         struct osp_it           *it = (struct osp_it *)di;
1393         struct lu_idxpage       *idxpage;
1394         struct page             **pages;
1395         int                     rc;
1396         int                     i;
1397         ENTRY;
1398
1399 again2:
1400         idxpage = it->ooi_cur_idxpage;
1401         if (idxpage != NULL) {
1402                 if (idxpage->lip_nr == 0)
1403                         RETURN(1);
1404
1405                 if (it->ooi_pos_ent < idxpage->lip_nr) {
1406                         CDEBUG(D_INFO, "ooi_pos %d nr %d\n",
1407                                (int)it->ooi_pos_ent, (int)idxpage->lip_nr);
1408                         RETURN(0);
1409                 }
1410                 it->ooi_cur_idxpage = NULL;
1411                 it->ooi_pos_lu_page++;
1412 again1:
1413                 if (it->ooi_pos_lu_page < LU_PAGE_COUNT) {
1414                         it->ooi_cur_idxpage = (void *)it->ooi_cur_page +
1415                                          LU_PAGE_SIZE * it->ooi_pos_lu_page;
1416                         if (it->ooi_swab)
1417                                 lustre_swab_lip_header(it->ooi_cur_idxpage);
1418                         if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) {
1419                                 struct osp_device *osp =
1420                                         lu2osp_dev(it->ooi_obj->do_lu.lo_dev);
1421
1422                                 CERROR("%s: invalid magic (%x != %x) for page "
1423                                        "%d/%d while read layout orphan index\n",
1424                                        osp->opd_obd->obd_name,
1425                                        it->ooi_cur_idxpage->lip_magic,
1426                                        LIP_MAGIC, it->ooi_pos_page,
1427                                        it->ooi_pos_lu_page);
1428                                 /* Skip this lu_page next time. */
1429                                 it->ooi_pos_ent = idxpage->lip_nr - 1;
1430                                 RETURN(-EINVAL);
1431                         }
1432                         it->ooi_pos_ent = -1;
1433                         goto again2;
1434                 }
1435
1436                 kunmap(it->ooi_cur_page);
1437                 it->ooi_cur_page = NULL;
1438                 it->ooi_pos_page++;
1439
1440 again0:
1441                 pages = it->ooi_pages;
1442                 if (it->ooi_pos_page < it->ooi_valid_npages) {
1443                         it->ooi_cur_page = kmap(pages[it->ooi_pos_page]);
1444                         it->ooi_pos_lu_page = 0;
1445                         goto again1;
1446                 }
1447
1448                 for (i = 0; i < it->ooi_total_npages; i++) {
1449                         if (pages[i] != NULL)
1450                                 __free_page(pages[i]);
1451                 }
1452                 OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages));
1453
1454                 it->ooi_pos_page = 0;
1455                 it->ooi_total_npages = 0;
1456                 it->ooi_valid_npages = 0;
1457                 it->ooi_swab = 0;
1458                 it->ooi_ent = NULL;
1459                 it->ooi_cur_page = NULL;
1460                 it->ooi_cur_idxpage = NULL;
1461                 it->ooi_pages = NULL;
1462         }
1463
1464         if (it->ooi_next == II_END_OFF)
1465                 RETURN(1);
1466
1467         rc = osp_it_fetch(env, it);
1468         if (rc == 0)
1469                 goto again0;
1470
1471         RETURN(rc);
1472 }
1473
1474 int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di)
1475 {
1476         struct osp_it           *it = (struct osp_it *)di;
1477         struct lu_idxpage       *idxpage;
1478         int                     rc;
1479         ENTRY;
1480
1481 again:
1482         idxpage = it->ooi_cur_idxpage;
1483         if (idxpage != NULL) {
1484                 if (idxpage->lip_nr == 0)
1485                         RETURN(1);
1486
1487                 it->ooi_pos_ent++;
1488                 if (it->ooi_pos_ent < idxpage->lip_nr) {
1489                         it->ooi_ent =
1490                                 (struct lu_orphan_ent *)idxpage->lip_entries +
1491                                                         it->ooi_pos_ent;
1492                         if (it->ooi_swab)
1493                                 lustre_swab_orphan_ent(it->ooi_ent);
1494                         RETURN(0);
1495                 }
1496         }
1497
1498         rc = osp_it_next_page(env, di);
1499         if (rc == 0)
1500                 goto again;
1501
1502         RETURN(rc);
1503 }
1504
1505 int osp_it_get(const struct lu_env *env, struct dt_it *di,
1506                const struct dt_key *key)
1507 {
1508         return 1;
1509 }
1510
1511 void osp_it_put(const struct lu_env *env, struct dt_it *di)
1512 {
1513 }
1514
1515 struct dt_key *osp_orphan_it_key(const struct lu_env *env,
1516                                  const struct dt_it *di)
1517 {
1518         struct osp_it   *it  = (struct osp_it *)di;
1519         struct lu_orphan_ent    *ent = (struct lu_orphan_ent *)it->ooi_ent;
1520
1521         if (likely(ent != NULL))
1522                 return (struct dt_key *)(&ent->loe_key);
1523
1524         return NULL;
1525 }
1526
1527 int osp_orphan_it_key_size(const struct lu_env *env, const struct dt_it *di)
1528 {
1529         return sizeof(struct lu_fid);
1530 }
1531
1532 int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di,
1533                       struct dt_rec *rec, __u32 attr)
1534 {
1535         struct osp_it   *it  = (struct osp_it *)di;
1536         struct lu_orphan_ent    *ent = (struct lu_orphan_ent *)it->ooi_ent;
1537
1538         if (likely(ent != NULL)) {
1539                 *(struct lu_orphan_rec *)rec = ent->loe_rec;
1540                 return 0;
1541         }
1542
1543         return -EINVAL;
1544 }
1545
1546 __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
1547 {
1548         struct osp_it   *it = (struct osp_it *)di;
1549
1550         return it->ooi_next;
1551 }
1552
1553 /**
1554  * \retval       +1: locate to the exactly position
1555  * \retval        0: cannot locate to the exactly position,
1556  *                   call next() to move to a valid position.
1557  * \retval      -ve: on error
1558  */
1559 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
1560                        __u64 hash)
1561 {
1562         struct osp_it   *it     = (struct osp_it *)di;
1563         int              rc;
1564
1565         it->ooi_next = hash;
1566         rc = osp_orphan_it_next(env, (struct dt_it *)di);
1567         if (rc == 1)
1568                 return 0;
1569
1570         if (rc == 0)
1571                 return 1;
1572
1573         return rc;
1574 }
1575
1576 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
1577                    void *key_rec)
1578 {
1579         return 0;
1580 }
1581
1582 static const struct dt_index_operations osp_orphan_index_ops = {
1583         .dio_lookup             = osp_orphan_index_lookup,
1584         .dio_declare_insert     = osp_orphan_index_declare_insert,
1585         .dio_insert             = osp_orphan_index_insert,
1586         .dio_declare_delete     = osp_orphan_index_declare_delete,
1587         .dio_delete             = osp_orphan_index_delete,
1588         .dio_it = {
1589                 .init           = osp_it_init,
1590                 .fini           = osp_it_fini,
1591                 .next           = osp_orphan_it_next,
1592                 .get            = osp_it_get,
1593                 .put            = osp_it_put,
1594                 .key            = osp_orphan_it_key,
1595                 .key_size       = osp_orphan_it_key_size,
1596                 .rec            = osp_orphan_it_rec,
1597                 .store          = osp_it_store,
1598                 .load           = osp_orphan_it_load,
1599                 .key_rec        = osp_it_key_rec,
1600         }
1601 };
1602
1603 static int osp_index_try(const struct lu_env *env,
1604                          struct dt_object *dt,
1605                          const struct dt_index_features *feat)
1606 {
1607         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1608
1609         if (fid_is_last_id(fid) && fid_is_idif(fid))
1610                 dt->do_index_ops = &osp_orphan_index_ops;
1611         else
1612                 dt->do_index_ops = &osp_md_index_ops;
1613         return 0;
1614 }
1615
1616 struct dt_object_operations osp_obj_ops = {
1617         .do_declare_attr_get    = osp_declare_attr_get,
1618         .do_attr_get            = osp_attr_get,
1619         .do_declare_attr_set    = osp_declare_attr_set,
1620         .do_attr_set            = osp_attr_set,
1621         .do_declare_xattr_get   = osp_declare_xattr_get,
1622         .do_xattr_get           = osp_xattr_get,
1623         .do_declare_xattr_set   = osp_declare_xattr_set,
1624         .do_xattr_set           = osp_xattr_set,
1625         .do_declare_create      = osp_declare_object_create,
1626         .do_create              = osp_object_create,
1627         .do_declare_destroy     = osp_declare_object_destroy,
1628         .do_destroy             = osp_object_destroy,
1629         .do_index_try           = osp_index_try,
1630 };
1631
1632 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
1633                            const struct lu_object_conf *conf)
1634 {
1635         struct osp_object       *po = lu2osp_obj(o);
1636         int                     rc = 0;
1637         ENTRY;
1638
1639         spin_lock_init(&po->opo_lock);
1640         o->lo_header->loh_attr |= LOHA_REMOTE;
1641
1642         if (is_ost_obj(o)) {
1643                 po->opo_obj.do_ops = &osp_obj_ops;
1644         } else {
1645                 struct lu_attr *la = &osp_env_info(env)->osi_attr;
1646
1647                 po->opo_obj.do_ops = &osp_md_obj_ops;
1648                 po->opo_obj.do_body_ops = &osp_md_body_ops;
1649                 rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
1650                                                      la, NULL);
1651                 if (rc == 0)
1652                         o->lo_header->loh_attr |=
1653                                 LOHA_EXISTS | (la->la_mode & S_IFMT);
1654                 if (rc == -ENOENT) {
1655                         po->opo_non_exist = 1;
1656                         rc = 0;
1657                 }
1658                 init_rwsem(&po->opo_sem);
1659         }
1660         RETURN(rc);
1661 }
1662
1663 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
1664 {
1665         struct osp_object       *obj = lu2osp_obj(o);
1666         struct lu_object_header *h = o->lo_header;
1667
1668         dt_object_fini(&obj->opo_obj);
1669         lu_object_header_fini(h);
1670         if (obj->opo_ooa != NULL) {
1671                 struct osp_xattr_entry *oxe;
1672                 struct osp_xattr_entry *tmp;
1673                 int                     count;
1674
1675                 list_for_each_entry_safe(oxe, tmp,
1676                                          &obj->opo_ooa->ooa_xattr_list,
1677                                          oxe_list) {
1678                         list_del(&oxe->oxe_list);
1679                         count = atomic_read(&oxe->oxe_ref);
1680                         LASSERTF(count == 1,
1681                                  "Still has %d users on the xattr entry %.*s\n",
1682                                  count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
1683
1684                         OBD_FREE(oxe, oxe->oxe_buflen);
1685                 }
1686                 OBD_FREE_PTR(obj->opo_ooa);
1687         }
1688         OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
1689 }
1690
1691 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
1692 {
1693         struct osp_object       *po = lu2osp_obj(o);
1694         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
1695
1696         ENTRY;
1697
1698         /*
1699          * release reservation if object was declared but not created
1700          * this may require lu_object_put() in LOD
1701          */
1702         if (unlikely(po->opo_reserved)) {
1703                 LASSERT(d->opd_pre != NULL);
1704                 LASSERT(d->opd_pre_reserved > 0);
1705                 spin_lock(&d->opd_pre_lock);
1706                 d->opd_pre_reserved--;
1707                 spin_unlock(&d->opd_pre_lock);
1708
1709                 /* not needed in cache any more */
1710                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
1711         }
1712
1713         if (is_ost_obj(o))
1714                 /* XXX: Currently, NOT cache OST-object on MDT because:
1715                  *      1. it is not often accessed on MDT.
1716                  *      2. avoid up layer (such as LFSCK) to load too many
1717                  *         once-used OST-objects. */
1718                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
1719
1720         EXIT;
1721 }
1722
1723 static int osp_object_print(const struct lu_env *env, void *cookie,
1724                             lu_printer_t p, const struct lu_object *l)
1725 {
1726         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
1727
1728         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
1729 }
1730
1731 static int osp_object_invariant(const struct lu_object *o)
1732 {
1733         LBUG();
1734 }
1735
1736 struct lu_object_operations osp_lu_obj_ops = {
1737         .loo_object_init        = osp_object_init,
1738         .loo_object_free        = osp_object_free,
1739         .loo_object_release     = osp_object_release,
1740         .loo_object_print       = osp_object_print,
1741         .loo_object_invariant   = osp_object_invariant
1742 };