Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * lustre/osp/osp_object.c
30  *
31  * Lustre OST Proxy Device (OSP) is the agent on the local MDT for the OST
32  * or remote MDT.
33  *
34  * OSP object attributes cache
35  * ---------------------------
36  * OSP object is the stub of the remote OST-object or MDT-object. Both the
37  * attribute and the extended attributes are stored on the peer side remotely.
38  * It is inefficient to send RPC to peer to fetch those attributes when every
39  * get_attr()/get_xattr() called. For a large system, the LFSCK synchronous
40  * mode scanning is prohibitively inefficient.
41  *
42  * The OSP maintains the OSP object attributes cache to cache some
43  * attributes on the local MDT.
44  *
45  * The basic attributes, such as owner/mode/flags, are stored in the
46  * osp_object::opo_attr. The extended attributes will be stored
47  * as osp_xattr_entry. Every extended attribute has an independent
48  * osp_xattr_entry, and all the osp_xattr_entry are linked into the
49  * osp_object::opo_xattr_list. The OSP object attributes cache
50  * is protected by the osp_object::opo_lock.
51  *
52  * Not all OSP objects have an attributes cache because maintaining
53  * the cache requires some resources. Currently, the OSP object
54  * attributes cache will be initialized when the attributes or the
55  * extended attributes are pre-fetched via osp_declare_attr_get()
56  * or osp_declare_xattr_get(). That is usually for LFSCK purpose,
57  * but it also can be shared by others.
58  *
59  *
60  * XXX: NOT prepare out RPC for remote transaction. ((please refer to the
61  *      comment of osp_trans_create() for remote transaction)
62  *
63  * According to our current transaction/dt_object_lock framework (to make
64  * the cross-MDTs modification for DNE1 to be workable), the transaction
65  * sponsor will start the transaction firstly, then try to acquire related
66  * dt_object_lock if needed. Under such rules, if we want to prepare the
67  * OUT RPC in the transaction declare phase, then related attr/xattr
68  * should be known without dt_object_lock. But such condition maybe not
69  * true for some remote transaction case. For example:
70  *
71  * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
72  * the dt_object_lock on the target MDT-object, it cannot know whether
73  * the MDT-object has linkEA or not, neither invalid or not.
74  *
75  * Since the LFSCK thread cannot hold dt_object_lock before the remote
76  * transaction start (otherwise there will be some potential deadlock),
77  * it cannot prepare related OUT RPC for repairing during the declare
78  * phase as other normal transactions do.
79  *
80  * To resolve the trouble, we will make OSP to prepare related OUT RPC
81  * after remote transaction started, and trigger the remote updating
82  * (send RPC) when trans_stop. Then the up layer users, such as LFSCK,
83  * can follow the general rule to handle trans_start/dt_object_lock
84  * for repairing linkEA inconsistency without distinguishing remote
85  * MDT-object.
86  *
87  * In fact, above solution for remote transaction should be the normal
88  * model without considering DNE1. The trouble brought by DNE1 will be
89  * resolved in DNE2. At that time, this patch can be removed.
90  *
91  *
92  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
93  * Author: Mikhail Pershin <mike.tappro@intel.com>
94  */
95
96 #define DEBUG_SUBSYSTEM S_MDS
97
98 #include <lustre_obdo.h>
99 #include <lustre_swab.h>
100
101 #include "osp_internal.h"
102
103 static inline __u32 osp_dev2node(struct osp_device *osp)
104 {
105         return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
106 }
107
108 static inline const char *osp_dto2name(struct osp_object *obj)
109 {
110         return obj->opo_obj.do_lu.lo_dev->ld_obd->obd_name;
111 }
112
113 static inline bool is_ost_obj(struct lu_object *lo)
114 {
115         return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
116 }
117
118 static inline void __osp_oac_xattr_assignment(struct osp_object *obj,
119                                               struct osp_xattr_entry *oxe,
120                                               const struct lu_buf *buf)
121 {
122         if (buf->lb_len > 0)
123                 memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
124
125         oxe->oxe_vallen = buf->lb_len;
126         oxe->oxe_exist = 1;
127         oxe->oxe_ready = 1;
128 }
129
130 /**
131  * Assign FID to the OST object.
132  *
133  * This function will assign the FID to the OST object of a striped file.
134  *
135  * \param[in] env       pointer to the thread context
136  * \param[in] d         pointer to the OSP device
137  * \param[in] o         pointer to the OSP object that the FID will be
138  *                      assigned to
139  */
140 static void osp_object_assign_fid(const struct lu_env *env,
141                                   struct osp_device *d, struct osp_object *o)
142 {
143         struct osp_thread_info *osi = osp_env_info(env);
144
145         LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
146         LASSERT(o->opo_reserved);
147         o->opo_reserved = 0;
148
149         osp_precreate_get_fid(env, d, &osi->osi_fid);
150
151         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
152 }
153
154 #define OXE_DEFAULT_LEN 16
155
156 /**
157  * Allocate osp_xattr_entry.
158  *
159  * If total size exceeds PAGE_SIZE, name and value will allocated in a
160  * separate buf, otherwise it's allocated inline.
161  *
162  * \param[in] name      pointer to XATTR name
163  * \param[in] namelen   XATTR name len
164  * \param[in] vallen    XATTR value len
165  * \retval              oxe pointer on success
166  * \retval              NULL on failure
167  */
168 static struct osp_xattr_entry *osp_oac_xattr_alloc(const char *name,
169                                                    size_t namelen,
170                                                    size_t vallen)
171 {
172         struct osp_xattr_entry *oxe;
173         size_t size;
174
175         if (!vallen)
176                 vallen = OXE_DEFAULT_LEN;
177         size = sizeof(*oxe) + namelen + 1 + vallen;
178         if (likely(size <= PAGE_SIZE)) {
179                 OBD_ALLOC(oxe, size);
180                 if (unlikely(!oxe))
181                         return NULL;
182                 oxe->oxe_buflen = size;
183                 oxe->oxe_value = oxe->oxe_name + namelen + 1;
184         } else {
185                 char *buf;
186
187                 OBD_ALLOC_LARGE(buf, vallen);
188                 if (unlikely(!buf))
189                         return NULL;
190
191                 size -= vallen;
192                 OBD_ALLOC(oxe, size);
193                 if (unlikely(!oxe)) {
194                         OBD_FREE(buf, size);
195                         return NULL;
196                 }
197                 oxe->oxe_buflen = vallen;
198                 oxe->oxe_value = buf;
199                 oxe->oxe_largebuf = 1;
200         }
201
202         INIT_LIST_HEAD(&oxe->oxe_list);
203
204         oxe->oxe_namelen = namelen;
205         memcpy(oxe->oxe_name, name, namelen);
206         /* One ref is for the caller, the other is for the entry on the list. */
207         atomic_set(&oxe->oxe_ref, 2);
208
209         return oxe;
210 }
211
212 static void osp_oac_xattr_free(struct osp_xattr_entry *oxe)
213 {
214         LASSERT(list_empty(&oxe->oxe_list));
215         if (unlikely(oxe->oxe_largebuf)) {
216                 OBD_FREE_LARGE(oxe->oxe_value, oxe->oxe_buflen);
217                 OBD_FREE(oxe, sizeof(*oxe) + oxe->oxe_namelen + 1);
218         } else {
219                 OBD_FREE(oxe, oxe->oxe_buflen);
220         }
221 }
222
223 /**
224  * Release reference from the OSP object extended attribute entry.
225  *
226  * If it is the last reference, then free the entry.
227  *
228  * \param[in] oxe       pointer to the OSP object extended attribute entry.
229  */
230 static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
231 {
232         if (atomic_dec_and_test(&oxe->oxe_ref))
233                 osp_oac_xattr_free(oxe);
234 }
235
236 /**
237  * Find the named extended attribute in the OSP object attributes cache.
238  *
239  * The caller should take the osp_object::opo_lock before calling
240  * this function.
241  *
242  * \param[in] obj       pointer to the OSP object
243  * \param[in] name      the name of the extended attribute
244  * \param[in] namelen   the name length of the extended attribute
245  *
246  * \retval              pointer to the found extended attribute entry
247  * \retval              NULL if the specified extended attribute is not
248  *                      in the cache
249  */
250 static struct osp_xattr_entry *
251 osp_oac_xattr_find_locked(struct osp_object *obj, const char *name,
252                           size_t namelen)
253 {
254         struct osp_xattr_entry *oxe;
255
256         list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) {
257                 if (namelen == oxe->oxe_namelen &&
258                     strncmp(name, oxe->oxe_name, namelen) == 0)
259                         return oxe;
260         }
261
262         return NULL;
263 }
264
265 /**
266  * Find the named extended attribute in the OSP object attributes cache.
267  *
268  * Call osp_oac_xattr_find_locked() with the osp_object::opo_lock held.
269  *
270  * \param[in] obj       pointer to the OSP object
271  * \param[in] name      the name of the extended attribute
272  * \param[in] unlink    true if the extended attribute entry is to be removed
273  *                      from the cache
274  *
275  * \retval              pointer to the found extended attribute entry
276  * \retval              NULL if the specified extended attribute is not
277  *                      in the cache
278  */
279 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
280                                                   const char *name, bool unlink)
281 {
282         struct osp_xattr_entry *oxe = NULL;
283
284         spin_lock(&obj->opo_lock);
285         oxe = osp_oac_xattr_find_locked(obj, name, strlen(name));
286         if (oxe) {
287                 if (unlink)
288                         list_del_init(&oxe->oxe_list);
289                 else
290                         atomic_inc(&oxe->oxe_ref);
291         }
292         spin_unlock(&obj->opo_lock);
293
294         return oxe;
295 }
296
297 /**
298  * Find the named extended attribute in the OSP object attributes cache.
299  *
300  * If it is not in the cache, then add an empty entry (that will be
301  * filled later) to cache with the given name.
302  *
303  * \param[in] obj       pointer to the OSP object
304  * \param[in] name      the name of the extended attribute
305  * \param[in] len       the length of the extended attribute value
306  *
307  * \retval              pointer to the found or new-created extended
308  *                      attribute entry
309  * \retval              NULL if the specified extended attribute is not in the
310  *                      cache or fail to add new empty entry to the cache.
311  */
312 static struct osp_xattr_entry *
313 osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
314 {
315         struct osp_xattr_entry *oxe;
316         struct osp_xattr_entry *tmp = NULL;
317         size_t namelen = strlen(name);
318
319         oxe = osp_oac_xattr_find(obj, name, false);
320         if (oxe)
321                 return oxe;
322
323         oxe = osp_oac_xattr_alloc(name, namelen, len);
324         if (!oxe)
325                 return NULL;
326
327         spin_lock(&obj->opo_lock);
328         tmp = osp_oac_xattr_find_locked(obj, name, namelen);
329         if (!tmp)
330                 list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list);
331         else
332                 atomic_inc(&tmp->oxe_ref);
333         spin_unlock(&obj->opo_lock);
334
335         if (tmp) {
336                 osp_oac_xattr_free(oxe);
337                 oxe = tmp;
338         }
339
340         return oxe;
341 }
342
343 /**
344  * Check whether \a oxe is large enough to hold the xattr value
345  *
346  * \param[in] oxe       pointer to the OSP object attributes cache xattr entry
347  * \param[in] len       xattr value size in bytes
348  *
349  * \retval              true if xattr can fit in \a oxe
350  * \retval              false if xattr can not fit in \a oxe
351  */
352 static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len)
353 {
354         if (unlikely(oxe->oxe_largebuf))
355                 return oxe->oxe_buflen >= len;
356
357         return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) >= len;
358 }
359
360 /**
361  * Assign the cached OST-object's EA with the given value.
362  *
363  * If the current EA entry in cache has not enough space to hold the new
364  * value, remove it, create a new one, then assign with the given value.
365  *
366  * \param[in] obj       pointer to the OSP object
367  * \param[in] oxe       pointer to the cached EA entry to be assigned
368  * \param[in] buf       pointer to the buffer with new EA value
369  *
370  * \retval              pointer to the new created EA entry in cache if
371  *                      current entry is not big enough; otherwise, the
372  *                      input 'oxe' will be returned.
373  */
374 static struct osp_xattr_entry *
375 osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe,
376                          const struct lu_buf *buf)
377 {
378         struct osp_xattr_entry *new = NULL;
379         struct osp_xattr_entry *old = NULL;
380         int namelen = oxe->oxe_namelen;
381         bool unlink_only = false;
382
383         if (!oxe_can_hold(oxe, buf->lb_len)) {
384                 new = osp_oac_xattr_alloc(oxe->oxe_name, namelen, buf->lb_len);
385                 if (likely(new)) {
386                         __osp_oac_xattr_assignment(obj, new, buf);
387                 } else {
388                         unlink_only = true;
389                         CWARN("%s: cannot update cached xattr %.*s of "DFID"\n",
390                               osp_dto2name(obj), namelen, oxe->oxe_name,
391                               PFID(lu_object_fid(&obj->opo_obj.do_lu)));
392                 }
393         }
394
395         spin_lock(&obj->opo_lock);
396         old = osp_oac_xattr_find_locked(obj, oxe->oxe_name, namelen);
397         if (likely(old)) {
398                 if (new) {
399                         /* Unlink the 'old'. */
400                         list_del_init(&old->oxe_list);
401
402                         /* Drop the ref for 'old' on list. */
403                         osp_oac_xattr_put(old);
404
405                         /* Drop the ref for current using. */
406                         osp_oac_xattr_put(oxe);
407                         oxe = new;
408
409                         /* Insert 'new' into list. */
410                         list_add_tail(&new->oxe_list, &obj->opo_xattr_list);
411                 } else if (unlink_only) {
412                         /* Unlink the 'old'. */
413                         list_del_init(&old->oxe_list);
414
415                         /* Drop the ref for 'old' on list. */
416                         osp_oac_xattr_put(old);
417                 } else {
418                         __osp_oac_xattr_assignment(obj, oxe, buf);
419                 }
420         } else if (new) {
421                 /* Drop the ref for current using. */
422                 osp_oac_xattr_put(oxe);
423                 oxe = new;
424
425                 /* Someone unlinked the 'old' by race,
426                  * insert the 'new' one into list. */
427                 list_add_tail(&new->oxe_list, &obj->opo_xattr_list);
428         }
429         spin_unlock(&obj->opo_lock);
430
431         return oxe;
432 }
433
434 /**
435  * Parse the OSP object attribute from the RPC reply.
436  *
437  * If the attribute is valid, then it will be added to the OSP object
438  * attributes cache.
439  *
440  * \param[in] env       pointer to the thread context
441  * \param[in] reply     pointer to the RPC reply
442  * \param[in] req       pointer to the RPC request
443  * \param[out] attr     pointer to buffer to hold the output attribute
444  * \param[in] obj       pointer to the OSP object
445  * \param[in] index     the index of the attribute buffer in the reply
446  *
447  * \retval              0 for success
448  * \retval              negative error number on failure
449  */
450 static int osp_get_attr_from_reply(const struct lu_env *env,
451                                    struct object_update_reply *reply,
452                                    struct ptlrpc_request *req,
453                                    struct lu_attr *attr,
454                                    struct osp_object *obj, int index)
455 {
456         struct osp_thread_info  *osi    = osp_env_info(env);
457         struct lu_buf           *rbuf   = &osi->osi_lb2;
458         struct obdo             *lobdo  = &osi->osi_obdo;
459         struct obdo             *wobdo;
460         int                     rc;
461
462         rc = object_update_result_data_get(reply, rbuf, index);
463         if (rc < 0)
464                 return rc;
465
466         wobdo = rbuf->lb_buf;
467         if (rbuf->lb_len != sizeof(*wobdo))
468                 return -EPROTO;
469
470         LASSERT(req != NULL);
471         if (req_capsule_req_need_swab(&req->rq_pill))
472                 lustre_swab_obdo(wobdo);
473
474         lustre_get_wire_obdo(NULL, lobdo, wobdo);
475         if (obj) {
476                 spin_lock(&obj->opo_lock);
477                 la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid);
478                 spin_unlock(&obj->opo_lock);
479         }
480         if (attr)
481                 la_from_obdo(attr, lobdo, lobdo->o_valid);
482
483         return 0;
484 }
485
486 /**
487  * Interpreter function for getting OSP object attribute asynchronously.
488  *
489  * Called to interpret the result of an async mode RPC for getting the
490  * OSP object attribute.
491  *
492  * \param[in] env       pointer to the thread context
493  * \param[in] reply     pointer to the RPC reply
494  * \param[in] req       pointer to the RPC request
495  * \param[in] obj       pointer to the OSP object
496  * \param[out] data     pointer to buffer to hold the output attribute
497  * \param[in] index     the index of the attribute buffer in the reply
498  * \param[in] rc        the result for handling the RPC
499  *
500  * \retval              0 for success
501  * \retval              negative error number on failure
502  */
503 static int osp_attr_get_interpterer(const struct lu_env *env,
504                                     struct object_update_reply *reply,
505                                     struct ptlrpc_request *req,
506                                     struct osp_object *obj,
507                                     void *data, int index, int rc)
508 {
509         struct lu_attr *attr = data;
510
511         if (rc == 0) {
512                 osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
513                 obj->opo_non_exist = 0;
514
515                 return osp_get_attr_from_reply(env, reply, req, NULL, obj,
516                                                index);
517         } else {
518                 if (rc == -ENOENT) {
519                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
520                         obj->opo_non_exist = 1;
521                 }
522
523                 spin_lock(&obj->opo_lock);
524                 attr->la_valid = 0;
525                 spin_unlock(&obj->opo_lock);
526         }
527
528         return 0;
529 }
530
531 /**
532  * Implement OSP layer dt_object_operations::do_declare_attr_get() interface.
533  *
534  * Declare that the caller will get attribute from the specified OST object.
535  *
536  * This function adds an Object Unified Target (OUT) sub-request to the per-OSP
537  * based shared asynchronous request queue. The osp_attr_get_interpterer()
538  * is registered as the interpreter function to handle the result of this
539  * sub-request.
540  *
541  * \param[in] env       pointer to the thread context
542  * \param[in] dt        pointer to the OSP layer dt_object
543  *
544  * \retval              0 for success
545  * \retval              negative error number on failure
546  */
547 static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
548 {
549         struct osp_object       *obj    = dt2osp_obj(dt);
550         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
551         int                      rc     = 0;
552
553         mutex_lock(&osp->opd_async_requests_mutex);
554         rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
555                                       &obj->opo_attr, sizeof(struct obdo),
556                                       osp_attr_get_interpterer);
557         mutex_unlock(&osp->opd_async_requests_mutex);
558
559         return rc;
560 }
561
562 /**
563  * Implement OSP layer dt_object_operations::do_attr_get() interface.
564  *
565  * Get attribute from the specified MDT/OST object.
566  *
567  * If the attribute is in the OSP object attributes cache, then return
568  * the cached attribute directly. Otherwise it will trigger an OUT RPC
569  * to the peer to get the attribute synchronously, if successful, add it
570  * to the OSP attributes cache. (\see lustre/osp/osp_trans.c for OUT RPC.)
571  *
572  * \param[in] env       pointer to the thread context
573  * \param[in] dt        pointer to the OSP layer dt_object
574  * \param[out] attr     pointer to the buffer to hold the output attribute
575  *
576  * \retval              0 for success
577  * \retval              negative error number on failure
578  */
579 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
580                  struct lu_attr *attr)
581 {
582         struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
583         struct osp_object               *obj = dt2osp_obj(dt);
584         struct dt_device                *dev = &osp->opd_dt_dev;
585         struct osp_update_request       *update;
586         struct object_update_reply      *reply;
587         struct ptlrpc_request           *req = NULL;
588         int                             invalidated, cache = 0, rc = 0;
589         ENTRY;
590
591         if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
592                 RETURN(-ENOENT);
593         if (obj->opo_destroyed)
594                 RETURN(-ENOENT);
595
596         spin_lock(&obj->opo_lock);
597         if (obj->opo_attr.la_valid != 0 && !obj->opo_stale) {
598                 *attr = obj->opo_attr;
599                 spin_unlock(&obj->opo_lock);
600
601                 RETURN(0);
602         }
603         spin_unlock(&obj->opo_lock);
604
605         update = osp_update_request_create(dev);
606         if (IS_ERR(update))
607                 RETURN(PTR_ERR(update));
608
609         rc = OSP_UPDATE_RPC_PACK(env, out_attr_get_pack, update,
610                                  lu_object_fid(&dt->do_lu));
611         if (rc != 0) {
612                 CERROR("%s: Insert update error "DFID": rc = %d\n",
613                        dev->dd_lu_dev.ld_obd->obd_name,
614                        PFID(lu_object_fid(&dt->do_lu)), rc);
615
616                 GOTO(out_req, rc);
617         }
618
619         invalidated = atomic_read(&obj->opo_invalidate_seq);
620
621         rc = osp_remote_sync(env, osp, update, &req);
622
623         down_read(&obj->opo_invalidate_sem);
624         if (invalidated == atomic_read(&obj->opo_invalidate_seq)) {
625                 /* no invalited has came so far, we can cache the attrs */
626                 cache = 1;
627         }
628
629         if (rc != 0) {
630                 if (rc == -ENOENT) {
631                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
632                         if (cache)
633                                 obj->opo_non_exist = 1;
634                 } else {
635                         CERROR("%s: osp_attr_get update error "DFID": rc = %d\n",
636                                dev->dd_lu_dev.ld_obd->obd_name,
637                                PFID(lu_object_fid(&dt->do_lu)), rc);
638                 }
639
640                 GOTO(out, rc);
641         }
642
643         osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
644         obj->opo_non_exist = 0;
645         reply = req_capsule_server_sized_get(&req->rq_pill,
646                                              &RMF_OUT_UPDATE_REPLY,
647                                              OUT_UPDATE_REPLY_SIZE);
648         if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
649                 GOTO(out, rc = -EPROTO);
650
651         rc = osp_get_attr_from_reply(env, reply, req, attr,
652                                      cache ? obj : NULL, 0);
653         if (rc != 0)
654                 GOTO(out, rc);
655
656         spin_lock(&obj->opo_lock);
657         if (cache)
658                 obj->opo_stale = 0;
659         spin_unlock(&obj->opo_lock);
660
661         GOTO(out, rc);
662
663 out:
664         up_read(&obj->opo_invalidate_sem);
665
666 out_req:
667         if (req != NULL)
668                 ptlrpc_req_finished(req);
669
670         osp_update_request_destroy(env, update);
671
672         return rc;
673 }
674
675 /**
676  * Implement OSP layer dt_object_operations::do_declare_attr_set() interface.
677  *
678  * If the transaction is not remote one, then declare the credits that will
679  * be used for the subsequent llog record for the object's attributes.
680  *
681  * \param[in] env       pointer to the thread context
682  * \param[in] dt        pointer to the OSP layer dt_object
683  * \param[in] attr      pointer to the attribute to be set
684  * \param[in] th        pointer to the transaction handler
685  *
686  * \retval              0 for success
687  * \retval              negative error number on failure
688  */
689 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
690                                 const struct lu_attr *attr, struct thandle *th)
691 {
692         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
693         struct osp_object       *o = dt2osp_obj(dt);
694         int                     rc;
695
696         if (is_only_remote_trans(th))
697                 return osp_md_declare_attr_set(env, dt, attr, th);
698         /*
699          * Usually we don't allow server stack to manipulate size
700          * but there is a special case when striping is created
701          * late, after stripeless file got truncated to non-zero.
702          *
703          * In this case we do the following:
704          *
705          * 1) grab id in declare - this can lead to leaked OST objects
706          *    but we don't currently have proper mechanism and the only
707          *    options we have are to do truncate RPC holding transaction
708          *    open (very bad) or to grab id in declare at cost of leaked
709          *    OST object in same very rare unfortunate case (just bad)
710          *    notice 1.6-2.0 do assignment outside of running transaction
711          *    all the time, meaning many more chances for leaked objects.
712          *
713          * 2) send synchronous truncate RPC with just assigned id
714          */
715
716         /* there are few places in MDD code still passing NULL
717          * XXX: to be fixed soon */
718         if (attr == NULL)
719                 RETURN(0);
720
721         if (attr->la_valid & LA_SIZE && attr->la_size > 0 &&
722             fid_is_zero(lu_object_fid(&o->opo_obj.do_lu))) {
723                 LASSERT(!dt_object_exists(dt));
724                 osp_object_assign_fid(env, d, o);
725                 rc = osp_object_truncate(env, dt, attr->la_size);
726                 if (rc != 0)
727                         RETURN(rc);
728         }
729
730         if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
731                 RETURN(0);
732
733         /* track all UID/GID, projid, and layout version changes via llog */
734         rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
735
736         return 0;
737 }
738
739 /**
740  * Implement OSP layer dt_object_operations::do_attr_set() interface.
741  *
742  * Set attribute to the specified OST object.
743  *
744  * If the transaction is a remote one, then add OUT_ATTR_SET sub-request
745  * in the OUT RPC that will be flushed when the remote transaction stop.
746  * Otherwise, it will generate a MDS_SETATTR64_REC record in the llog that
747  * will be handled by a dedicated thread asynchronously.
748  *
749  * If the attribute entry exists in the OSP object attributes cache,
750  * then update the cached attribute according to given attribute.
751  *
752  * \param[in] env       pointer to the thread context
753  * \param[in] dt        pointer to the OSP layer dt_object
754  * \param[in] attr      pointer to the attribute to be set
755  * \param[in] th        pointer to the transaction handler
756  *
757  * \retval              0 for success
758  * \retval              negative error number on failure
759  */
760 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
761                         const struct lu_attr *attr, struct thandle *th)
762 {
763         struct osp_object       *o = dt2osp_obj(dt);
764         int                      rc = 0;
765         ENTRY;
766
767         /* we're interested in uid/gid/projid/layout version changes,
768          * and also specific setting of enc flag
769          */
770         if (!(attr->la_valid & LA_REMOTE_ATTR_SET) &&
771             !(attr->la_valid == LA_FLAGS &&
772               attr->la_flags == LUSTRE_ENCRYPT_FL))
773                 RETURN(0);
774
775         if (!is_only_remote_trans(th)) {
776                 if (attr->la_flags & LUSTRE_SET_SYNC_FL) {
777                         struct ptlrpc_request *req = NULL;
778                         struct osp_update_request *update = NULL;
779                         struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
780
781                         update = osp_update_request_create(&osp->opd_dt_dev);
782                         if (IS_ERR(update))
783                                 RETURN(PTR_ERR(update));
784
785                         rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update,
786                                                  lu_object_fid(&dt->do_lu),
787                                                  attr);
788                         if (rc != 0) {
789                                 CERROR("%s: update error "DFID": rc = %d\n",
790                                        osp->opd_obd->obd_name,
791                                        PFID(lu_object_fid(&dt->do_lu)), rc);
792
793                                 osp_update_request_destroy(env, update);
794                                 RETURN(rc);
795                         }
796
797                         rc = osp_remote_sync(env, osp, update, &req);
798                         if (req != NULL)
799                                 ptlrpc_req_finished(req);
800
801                         osp_update_request_destroy(env, update);
802                 } else {
803                         struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
804
805                         rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
806                         /* send layout version to OST ASAP */
807                         if (attr->la_valid & LA_LAYOUT_VERSION)
808                                 wake_up(&osp->opd_sync_waitq);
809                         /* XXX: send new uid/gid to OST ASAP? */
810                 }
811         } else {
812                 struct lu_attr  *la;
813
814                 /* It is for OST-object attr_set directly without updating
815                  * local MDT-object attribute. It is usually used by LFSCK. */
816                 rc = osp_md_attr_set(env, dt, attr, th);
817                 CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n",
818                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
819
820                 if (rc != 0)
821                         RETURN(rc);
822
823                 /* Update the OSP object attributes cache. */
824                 la = &o->opo_attr;
825                 spin_lock(&o->opo_lock);
826                 if (attr->la_valid & LA_UID) {
827                         la->la_uid = attr->la_uid;
828                         la->la_valid |= LA_UID;
829                 }
830
831                 if (attr->la_valid & LA_GID) {
832                         la->la_gid = attr->la_gid;
833                         la->la_valid |= LA_GID;
834                 }
835                 if (attr->la_valid & LA_PROJID) {
836                         la->la_projid = attr->la_projid;
837                         la->la_valid |= LA_PROJID;
838                 }
839                 spin_unlock(&o->opo_lock);
840         }
841
842         RETURN(rc);
843 }
844
845 /**
846  * Interpreter function for getting OSP object extended attribute asynchronously
847  *
848  * Called to interpret the result of an async mode RPC for getting the
849  * OSP object extended attribute.
850  *
851  * \param[in] env       pointer to the thread context
852  * \param[in] reply     pointer to the RPC reply
853  * \param[in] req       pointer to the RPC request
854  * \param[in] obj       pointer to the OSP object
855  * \param[out] data     pointer to OSP object attributes cache
856  * \param[in] index     the index of the attribute buffer in the reply
857  * \param[in] rc        the result for handling the RPC
858  *
859  * \retval              0 for success
860  * \retval              negative error number on failure
861  */
862 static int osp_xattr_get_interpterer(const struct lu_env *env,
863                                      struct object_update_reply *reply,
864                                      struct ptlrpc_request *req,
865                                      struct osp_object *obj,
866                                      void *data, int index, int rc)
867 {
868         struct osp_xattr_entry *oxe = data;
869
870         spin_lock(&obj->opo_lock);
871         if (rc >= 0) {
872                 struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
873
874                 rc = object_update_result_data_get(reply, rbuf, index);
875                 if (rc == -ENOENT || rc == -ENODATA || rc == 0) {
876                         oxe->oxe_exist = 0;
877                         oxe->oxe_ready = 1;
878                         goto unlock;
879                 }
880
881                 if (unlikely(rc < 0) || !oxe_can_hold(oxe, rbuf->lb_len)) {
882                         oxe->oxe_ready = 0;
883                         goto unlock;
884                 }
885
886                 __osp_oac_xattr_assignment(obj, oxe, rbuf);
887         } else if (rc == -ENOENT || rc == -ENODATA) {
888                 oxe->oxe_exist = 0;
889                 oxe->oxe_ready = 1;
890         } else {
891                 oxe->oxe_ready = 0;
892         }
893
894 unlock:
895         spin_unlock(&obj->opo_lock);
896
897         /* Put the reference obtained in the osp_declare_xattr_get(). */
898         osp_oac_xattr_put(oxe);
899
900         return 0;
901 }
902
903 /**
904  * Implement OSP dt_object_operations::do_declare_xattr_get() interface.
905  *
906  * Declare that the caller will get extended attribute from the specified
907  * OST object.
908  *
909  * This function will add an OUT_XATTR_GET sub-request to the per OSP
910  * based shared asynchronous request queue with the interpreter function:
911  * osp_xattr_get_interpterer().
912  *
913  * \param[in] env       pointer to the thread context
914  * \param[in] dt        pointer to the OSP layer dt_object
915  * \param[out] buf      pointer to the lu_buf to hold the extended attribute
916  * \param[in] name      the name for the expected extended attribute
917  *
918  * \retval              0 for success
919  * \retval              negative error number on failure
920  */
921 static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
922                                  struct lu_buf *buf, const char *name)
923 {
924         struct osp_object       *obj     = dt2osp_obj(dt);
925         struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
926         struct osp_xattr_entry  *oxe;
927         int                      rc      = 0;
928         __u16 len;
929
930         LASSERT(buf != NULL);
931         LASSERT(name != NULL);
932
933         if (unlikely(buf->lb_len == 0))
934                 return -EINVAL;
935
936         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
937         if (oxe == NULL)
938                 return -ENOMEM;
939
940         len = strlen(name) + 1;
941         mutex_lock(&osp->opd_async_requests_mutex);
942         rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
943                                       &len, (const void **)&name,
944                                       oxe, buf->lb_len,
945                                       osp_xattr_get_interpterer);
946         if (rc != 0) {
947                 mutex_unlock(&osp->opd_async_requests_mutex);
948                 osp_oac_xattr_put(oxe);
949         } else {
950                 struct osp_update_request *our;
951                 struct osp_update_request_sub *ours;
952
953                 /* XXX: Currently, we trigger the batched async OUT
954                  *      RPC via dt_declare_xattr_get(). It is not
955                  *      perfect solution, but works well now.
956                  *
957                  *      We will improve it in the future. */
958                 our = osp->opd_async_requests;
959                 ours = osp_current_object_update_request(our);
960                 if (ours != NULL && ours->ours_req != NULL &&
961                     ours->ours_req->ourq_count > 0) {
962                         osp->opd_async_requests = NULL;
963                         mutex_unlock(&osp->opd_async_requests_mutex);
964                         rc = osp_unplug_async_request(env, osp, our);
965                 } else {
966                         mutex_unlock(&osp->opd_async_requests_mutex);
967                 }
968         }
969
970         return rc;
971 }
972
973 /**
974  * Implement OSP layer dt_object_operations::do_xattr_get() interface.
975  *
976  * Get extended attribute from the specified MDT/OST object.
977  *
978  * If the extended attribute is in the OSP object attributes cache, then
979  * return the cached extended attribute directly. Otherwise it will get
980  * the extended attribute synchronously, if successful, add it to the OSP
981  * attributes cache. (\see lustre/osp/osp_trans.c for OUT RPC.)
982  *
983  * There is a race condition: some other thread has added the named extended
984  * attributed entry to the OSP object attributes cache during the current
985  * OUT_XATTR_GET handling. If such case happens, the OSP will replace the
986  * (just) existing extended attribute entry with the new replied one.
987  *
988  * \param[in] env       pointer to the thread context
989  * \param[in] dt        pointer to the OSP layer dt_object
990  * \param[out] buf      pointer to the lu_buf to hold the extended attribute
991  * \param[in] name      the name for the expected extended attribute
992  *
993  * \retval              0 for success
994  * \retval              negative error number on failure
995  */
996 int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
997                   struct lu_buf *buf, const char *name)
998 {
999         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
1000         struct osp_object       *obj    = dt2osp_obj(dt);
1001         struct dt_device        *dev    = &osp->opd_dt_dev;
1002         struct lu_buf           *rbuf   = &osp_env_info(env)->osi_lb2;
1003         struct osp_update_request *update = NULL;
1004         struct ptlrpc_request   *req    = NULL;
1005         struct object_update_reply *reply;
1006         struct osp_xattr_entry  *oxe    = NULL;
1007         const char *dname = osp_dto2name(obj);
1008         int invalidated, rc = 0;
1009         ENTRY;
1010
1011         LASSERT(buf != NULL);
1012         LASSERT(name != NULL);
1013
1014         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
1015             osp->opd_index == cfs_fail_val) {
1016                 if (is_ost_obj(&dt->do_lu)) {
1017                         if (osp_dev2node(osp) == cfs_fail_val)
1018                                 RETURN(-ENOTCONN);
1019                 } else {
1020                         if (strcmp(name, XATTR_NAME_LINK) == 0)
1021                                 RETURN(-ENOTCONN);
1022                 }
1023         }
1024
1025         if (unlikely(obj->opo_non_exist))
1026                 RETURN(-ENOENT);
1027
1028         invalidated = atomic_read(&obj->opo_invalidate_seq);
1029
1030         oxe = osp_oac_xattr_find(obj, name, false);
1031         if (oxe != NULL) {
1032                 spin_lock(&obj->opo_lock);
1033                 if (oxe->oxe_ready) {
1034                         if (!oxe->oxe_exist)
1035                                 GOTO(unlock, rc = -ENODATA);
1036
1037                         if (buf->lb_buf == NULL)
1038                                 GOTO(unlock, rc = oxe->oxe_vallen);
1039
1040                         if (buf->lb_len < oxe->oxe_vallen)
1041                                 GOTO(unlock, rc = -ERANGE);
1042
1043                         memcpy(buf->lb_buf, oxe->oxe_value,
1044                                oxe->oxe_vallen);
1045
1046                         GOTO(unlock, rc = oxe->oxe_vallen);
1047
1048 unlock:
1049                         spin_unlock(&obj->opo_lock);
1050                         osp_oac_xattr_put(oxe);
1051
1052                         return rc;
1053                 }
1054                 spin_unlock(&obj->opo_lock);
1055         }
1056         update = osp_update_request_create(dev);
1057         if (IS_ERR(update))
1058                 GOTO(out_req, rc = PTR_ERR(update));
1059
1060         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_get_pack, update,
1061                                  lu_object_fid(&dt->do_lu), name, buf->lb_len);
1062         if (rc != 0) {
1063                 CERROR("%s: Insert update error "DFID": rc = %d\n",
1064                        dname, PFID(lu_object_fid(&dt->do_lu)), rc);
1065                 GOTO(out_req, rc);
1066         }
1067
1068         rc = osp_remote_sync(env, osp, update, &req);
1069
1070         down_read(&obj->opo_invalidate_sem);
1071         if (invalidated != atomic_read(&obj->opo_invalidate_seq)) {
1072                 /* invalidated has been requested, we can't cache the result */
1073                 if (rc < 0) {
1074                         if (rc == -ENOENT)
1075                                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
1076                         GOTO(out, rc);
1077                 }
1078                 reply = req_capsule_server_sized_get(&req->rq_pill,
1079                                                      &RMF_OUT_UPDATE_REPLY,
1080                                 OUT_UPDATE_REPLY_SIZE);
1081                 if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
1082                         CERROR("%s: Wrong version %x expected %x "DFID
1083                                ": rc = %d\n", dname, reply->ourp_magic,
1084                                UPDATE_REPLY_MAGIC,
1085                                PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
1086                         GOTO(out, rc = -EPROTO);
1087                 }
1088                 rc = object_update_result_data_get(reply, rbuf, 0);
1089                 GOTO(out, rc);
1090         }
1091
1092         if (rc < 0) {
1093                 if (rc == -ENOENT) {
1094                         dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
1095                         obj->opo_non_exist = 1;
1096                 }
1097
1098                 if (oxe == NULL)
1099                         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
1100
1101                 if (oxe == NULL) {
1102                         CWARN("%s: Fail to add xattr (%s) to cache for "
1103                               DFID" (1): rc = %d\n", dname, name,
1104                               PFID(lu_object_fid(&dt->do_lu)), rc);
1105
1106                         GOTO(out, rc);
1107                 }
1108
1109                 spin_lock(&obj->opo_lock);
1110                 if (rc == -ENOENT || rc == -ENODATA) {
1111                         oxe->oxe_exist = 0;
1112                         oxe->oxe_ready = 1;
1113                 } else {
1114                         oxe->oxe_ready = 0;
1115                 }
1116                 spin_unlock(&obj->opo_lock);
1117
1118                 GOTO(out, rc);
1119         }
1120
1121         reply = req_capsule_server_sized_get(&req->rq_pill,
1122                                              &RMF_OUT_UPDATE_REPLY,
1123                                              OUT_UPDATE_REPLY_SIZE);
1124         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
1125                 CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
1126                        dname, reply->ourp_magic, UPDATE_REPLY_MAGIC,
1127                        PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
1128
1129                 GOTO(out, rc = -EPROTO);
1130         }
1131
1132         rc = object_update_result_data_get(reply, rbuf, 0);
1133         if (rc < 0 || rbuf->lb_len == 0) {
1134                 if (oxe == NULL && rc == -ENODATA) {
1135                         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
1136                         if (oxe == NULL) {
1137                                 rc = -ENOMEM;
1138                                 CWARN("%s: Fail to add xattr (%s) to cache for "
1139                                       DFID" (1): rc = %d\n", dname, name,
1140                                       PFID(lu_object_fid(&dt->do_lu)), rc);
1141                                 GOTO(out, rc);
1142                         }
1143                 }
1144
1145                 if (oxe) {
1146                         spin_lock(&obj->opo_lock);
1147                         if (unlikely(rc == -ENODATA)) {
1148                                 oxe->oxe_exist = 0;
1149                                 oxe->oxe_ready = 1;
1150                         } else {
1151                                 oxe->oxe_ready = 0;
1152                         }
1153                         spin_unlock(&obj->opo_lock);
1154                 }
1155
1156                 GOTO(out, rc);
1157         }
1158
1159         /* For detecting EA size. */
1160         if (!buf->lb_buf)
1161                 GOTO(out, rc);
1162
1163         if (!oxe) {
1164                 oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
1165                 if (!oxe) {
1166                         CWARN("%s: Fail to add xattr (%s) to "
1167                               "cache for "DFID" (2): rc = %d\n",
1168                               dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
1169
1170                         GOTO(out, rc);
1171                 }
1172         }
1173
1174         oxe = osp_oac_xattr_assignment(obj, oxe, rbuf);
1175
1176         GOTO(out, rc);
1177
1178 out:
1179         up_read(&obj->opo_invalidate_sem);
1180
1181 out_req:
1182         if (rc > 0 && buf->lb_buf) {
1183                 if (unlikely(buf->lb_len < rbuf->lb_len))
1184                         rc = -ERANGE;
1185                 else
1186                         memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
1187         }
1188
1189         if (req)
1190                 ptlrpc_req_finished(req);
1191
1192         if (update && !IS_ERR(update))
1193                 osp_update_request_destroy(env, update);
1194
1195         if (oxe)
1196                 osp_oac_xattr_put(oxe);
1197
1198         return rc;
1199 }
1200
1201 /**
1202  * Implement OSP layer dt_object_operations::do_declare_xattr_set() interface.
1203  *
1204  * Declare that the caller will set extended attribute to the specified
1205  * MDT/OST object.
1206  *
1207  * If it is non-remote transaction, it will add an OUT_XATTR_SET sub-request
1208  * to the OUT RPC that will be flushed when the transaction start. And if the
1209  * OSP attributes cache is initialized, then check whether the name extended
1210  * attribute entry exists in the cache or not. If yes, replace it; otherwise,
1211  * add the extended attribute to the cache.
1212  *
1213  * \param[in] env       pointer to the thread context
1214  * \param[in] dt        pointer to the OSP layer dt_object
1215  * \param[in] buf       pointer to the lu_buf to hold the extended attribute
1216  * \param[in] name      the name of the extended attribute to be set
1217  * \param[in] flag      to indicate the detailed set operation: LU_XATTR_CREATE
1218  *                      or LU_XATTR_REPLACE or others
1219  * \param[in] th        pointer to the transaction handler
1220  *
1221  * \retval              0 for success
1222  * \retval              negative error number on failure
1223  */
1224 int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
1225                           const struct lu_buf *buf, const char *name,
1226                           int flag, struct thandle *th)
1227 {
1228         return osp_trans_update_request_create(th);
1229 }
1230
1231 /**
1232  * Implement OSP layer dt_object_operations::do_xattr_set() interface.
1233  *
1234  * Set extended attribute to the specified MDT/OST object.
1235  *
1236  * Add an OUT_XATTR_SET sub-request into the OUT RPC that will be flushed in
1237  * the transaction stop. And if the OSP attributes cache is initialized, then
1238  * check whether the name extended attribute entry exists in the cache or not.
1239  * If yes, replace it; otherwise, add the extended attribute to the cache.
1240  *
1241  * \param[in] env       pointer to the thread context
1242  * \param[in] dt        pointer to the OSP layer dt_object
1243  * \param[in] buf       pointer to the lu_buf to hold the extended attribute
1244  * \param[in] name      the name of the extended attribute to be set
1245  * \param[in] fl        to indicate the detailed set operation: LU_XATTR_CREATE
1246  *                      or LU_XATTR_REPLACE or others
1247  * \param[in] th        pointer to the transaction handler
1248  *
1249  * \retval              0 for success
1250  * \retval              negative error number on failure
1251  */
1252 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
1253                   const struct lu_buf *buf, const char *name, int fl,
1254                   struct thandle *th)
1255 {
1256         struct osp_object *o = dt2osp_obj(dt);
1257         struct osp_update_request *update;
1258         struct osp_xattr_entry *oxe;
1259         int rc;
1260         ENTRY;
1261
1262         update = thandle_to_osp_update_request(th);
1263         LASSERT(update != NULL);
1264
1265         CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
1266                PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
1267
1268         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_set_pack, update,
1269                                  lu_object_fid(&dt->do_lu), buf, name, fl);
1270         if (rc != 0)
1271                 RETURN(rc);
1272
1273         /* Do not cache linkEA that may be self-adjusted by peers
1274          * under EA overflow case. */
1275         if (strcmp(name, XATTR_NAME_LINK) == 0) {
1276                 oxe = osp_oac_xattr_find(o, name, true);
1277                 if (oxe != NULL)
1278                         osp_oac_xattr_put(oxe);
1279
1280                 RETURN(0);
1281         }
1282
1283         oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
1284         if (oxe == NULL) {
1285                 CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
1286                       osp_dto2name(o), name, PFID(lu_object_fid(&dt->do_lu)));
1287
1288                 RETURN(0);
1289         }
1290
1291         oxe = osp_oac_xattr_assignment(o, oxe, buf);
1292         if (oxe)
1293                 osp_oac_xattr_put(oxe);
1294
1295         RETURN(0);
1296 }
1297
1298 /**
1299  * Implement OSP layer dt_object_operations::do_declare_xattr_del() interface.
1300  *
1301  * Declare that the caller will delete extended attribute on the specified
1302  * MDT/OST object.
1303  *
1304  * If it is non-remote transaction, it will add an OUT_XATTR_DEL sub-request
1305  * to the OUT RPC that will be flushed when the transaction start. And if the
1306  * name extended attribute entry exists in the OSP attributes cache, then remove
1307  * it from the cache.
1308  *
1309  * \param[in] env       pointer to the thread context
1310  * \param[in] dt        pointer to the OSP layer dt_object
1311  * \param[in] name      the name of the extended attribute to be set
1312  * \param[in] th        pointer to the transaction handler
1313  *
1314  * \retval              0 for success
1315  * \retval              negative error number on failure
1316  */
1317 int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
1318                           const char *name, struct thandle *th)
1319 {
1320         return osp_trans_update_request_create(th);
1321 }
1322
1323 /**
1324  * Implement OSP layer dt_object_operations::do_xattr_del() interface.
1325  *
1326  * Delete extended attribute on the specified MDT/OST object.
1327  *
1328  * If it is remote transaction, it will add an OUT_XATTR_DEL sub-request into
1329  * the OUT RPC that will be flushed when the transaction stop. And if the name
1330  * extended attribute entry exists in the OSP attributes cache, then remove it
1331  * from the cache.
1332  *
1333  * \param[in] env       pointer to the thread context
1334  * \param[in] dt        pointer to the OSP layer dt_object
1335  * \param[in] name      the name of the extended attribute to be set
1336  * \param[in] th        pointer to the transaction handler
1337  *
1338  * \retval              0 for success
1339  * \retval              negative error number on failure
1340  */
1341 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
1342                   const char *name, struct thandle *th)
1343 {
1344         struct osp_update_request *update;
1345         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1346         struct osp_object *o = dt2osp_obj(dt);
1347         struct osp_xattr_entry *oxe;
1348         int rc;
1349
1350         update = thandle_to_osp_update_request(th);
1351         LASSERT(update != NULL);
1352
1353         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_del_pack, update, fid, name);
1354         if (rc != 0)
1355                 return rc;
1356
1357         oxe = osp_oac_xattr_find(o, name, true);
1358         if (oxe != NULL)
1359                 /* Drop the ref for entry on list. */
1360                 osp_oac_xattr_put(oxe);
1361
1362         return 0;
1363 }
1364
1365 void osp_obj_invalidate_cache(struct osp_object *obj)
1366 {
1367         struct osp_xattr_entry *oxe;
1368         struct osp_xattr_entry *tmp;
1369
1370         spin_lock(&obj->opo_lock);
1371         list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
1372                 oxe->oxe_ready = 0;
1373                 list_del_init(&oxe->oxe_list);
1374                 osp_oac_xattr_put(oxe);
1375         }
1376         obj->opo_attr.la_valid = 0;
1377         spin_unlock(&obj->opo_lock);
1378 }
1379
1380 /**
1381  * Implement OSP layer dt_object_operations::do_invalidate() interface.
1382  *
1383  * Invalidate attributes cached on the specified MDT/OST object.
1384  *
1385  * \param[in] env       pointer to the thread context
1386  * \param[in] dt        pointer to the OSP layer dt_object
1387  *
1388  * \retval              0 for success
1389  * \retval              negative error number on failure
1390  */
1391 int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
1392 {
1393         struct osp_object *obj = dt2osp_obj(dt);
1394         ENTRY;
1395
1396         CDEBUG(D_HA, "Invalidate osp_object "DFID"\n",
1397                PFID(lu_object_fid(&dt->do_lu)));
1398
1399         /* serialize attr/EA set vs. invalidation */
1400         down_write(&obj->opo_invalidate_sem);
1401
1402         /* this should invalidate all in-flights */
1403         atomic_inc(&obj->opo_invalidate_seq);
1404
1405         spin_lock(&obj->opo_lock);
1406         /* do not mark new objects stale */
1407         if (obj->opo_attr.la_valid)
1408                 obj->opo_stale = 1;
1409         obj->opo_non_exist = 0;
1410         spin_unlock(&obj->opo_lock);
1411
1412         osp_obj_invalidate_cache(obj);
1413
1414         up_write(&obj->opo_invalidate_sem);
1415
1416         RETURN(0);
1417 }
1418
1419 bool osp_check_stale(struct dt_object *dt)
1420 {
1421         struct osp_object *obj = dt2osp_obj(dt);
1422
1423         if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
1424                 return true;
1425
1426         return obj->opo_stale;
1427 }
1428
1429
1430 /**
1431  * Implement OSP layer dt_object_operations::do_declare_create() interface.
1432  *
1433  * Declare that the caller will create the OST object.
1434  *
1435  * If the transaction is a remote transaction and the FID for the OST-object
1436  * has been assigned already, then handle it as creating (remote) MDT object
1437  * via osp_md_declare_create(). This function is usually used for LFSCK
1438  * to re-create the lost OST object. Otherwise, if it is not replay case, the
1439  * OSP will reserve pre-created object for the subsequent create operation;
1440  * if the MDT side cached pre-created objects are less than some threshold,
1441  * then it will wakeup the pre-create thread.
1442  *
1443  * \param[in] env       pointer to the thread context
1444  * \param[in] dt        pointer to the OSP layer dt_object
1445  * \param[in] attr      the attribute for the object to be created
1446  * \param[in] hint      pointer to the hint for creating the object, such as
1447  *                      the parent object
1448  * \param[in] dof       pointer to the dt_object_format for help the creation
1449  * \param[in] th        pointer to the transaction handler
1450  *
1451  * \retval              0 for success
1452  * \retval              negative error number on failure
1453  */
1454 static int osp_declare_create(const struct lu_env *env, struct dt_object *dt,
1455                               struct lu_attr *attr,
1456                               struct dt_allocation_hint *hint,
1457                               struct dt_object_format *dof, struct thandle *th)
1458 {
1459         struct osp_thread_info  *osi = osp_env_info(env);
1460         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
1461         struct osp_object       *o = dt2osp_obj(dt);
1462         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1463         struct thandle          *local_th;
1464         int                      rc = 0;
1465
1466         ENTRY;
1467
1468         if (is_only_remote_trans(th) && !fid_is_zero(fid)) {
1469                 LASSERT(fid_is_sane(fid));
1470
1471                 rc = osp_md_declare_create(env, dt, attr, hint, dof, th);
1472
1473                 RETURN(rc);
1474         }
1475
1476         /* should happen to non-0 OSP only so that at least one object
1477          * has been already declared in the scenario and LOD should
1478          * cleanup that */
1479         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
1480                 RETURN(-ENOSPC);
1481
1482         LASSERT(d->opd_last_used_oid_file);
1483
1484         /*
1485          * There can be gaps in precreated ids and record to unlink llog
1486          * XXX: we do not handle gaps yet, implemented before solution
1487          *      was found to be racy, so we disabled that. there is no
1488          *      point in making useless but expensive llog declaration.
1489          */
1490         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
1491
1492         local_th = osp_get_storage_thandle(env, th, d);
1493         if (IS_ERR(local_th))
1494                 RETURN(PTR_ERR(local_th));
1495
1496         if (unlikely(!fid_is_zero(fid))) {
1497                 /* replay case: caller knows fid */
1498                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
1499                                    d->opd_index);
1500                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1501                                              &osi->osi_lb, osi->osi_off,
1502                                              local_th);
1503                 RETURN(rc);
1504         }
1505
1506         /*
1507          * in declaration we need to reserve object so that we don't block
1508          * awaiting precreation RPC to complete
1509          */
1510         rc = osp_precreate_reserve(env, d, !hint || hint->dah_can_block);
1511         /*
1512          * we also need to declare update to local "last used id" file for
1513          * recovery if object isn't used for a reason, we need to release
1514          * reservation, this can be made in osd_object_release()
1515          */
1516         if (rc == 0) {
1517                 /* mark id is reserved: in create we don't want to talk
1518                  * to OST */
1519                 LASSERT(o->opo_reserved == 0);
1520                 o->opo_reserved = 1;
1521
1522                 /* common for all OSPs file hystorically */
1523                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
1524                                    d->opd_index);
1525                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1526                                              &osi->osi_lb, osi->osi_off,
1527                                              local_th);
1528         } else {
1529                 /* not needed in the cache anymore */
1530                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1531                             &dt->do_lu.lo_header->loh_flags);
1532         }
1533         RETURN(rc);
1534 }
1535
1536 /**
1537  * Implement OSP layer dt_object_operations::do_create() interface.
1538  *
1539  * Create the OST object.
1540  *
1541  * If the transaction is a remote transaction and the FID for the OST-object
1542  * has been assigned already, then handle it as handling MDT object via the
1543  * osp_md_create(). For other cases, the OSP will assign FID to the
1544  * object to be created, and update last_used Object ID (OID) file.
1545  *
1546  * \param[in] env       pointer to the thread context
1547  * \param[in] dt        pointer to the OSP layer dt_object
1548  * \param[in] attr      the attribute for the object to be created
1549  * \param[in] hint      pointer to the hint for creating the object, such as
1550  *                      the parent object
1551  * \param[in] dof       pointer to the dt_object_format for help the creation
1552  * \param[in] th        pointer to the transaction handler
1553  *
1554  * \retval              0 for success
1555  * \retval              negative error number on failure
1556  */
1557 static int osp_create(const struct lu_env *env, struct dt_object *dt,
1558                       struct lu_attr *attr, struct dt_allocation_hint *hint,
1559                       struct dt_object_format *dof, struct thandle *th)
1560 {
1561         struct osp_thread_info *osi = osp_env_info(env);
1562         struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev);
1563         struct osp_object *o = dt2osp_obj(dt);
1564         struct lu_fid *fid = &osi->osi_fid;
1565         struct thandle *local_th;
1566         bool replay = false;
1567         int rc = 0;
1568
1569         ENTRY;
1570
1571         if (is_only_remote_trans(th) &&
1572             !fid_is_zero(lu_object_fid(&dt->do_lu))) {
1573                 LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu)));
1574
1575                 rc = osp_md_create(env, dt, attr, hint, dof, th);
1576                 if (rc == 0)
1577                         o->opo_non_exist = 0;
1578
1579                 RETURN(rc);
1580         }
1581
1582         o->opo_non_exist = 0;
1583         if (o->opo_reserved) {
1584                 /* regular case, fid is assigned holding transaction open */
1585                  osp_object_assign_fid(env, d, o);
1586         } else {
1587                 replay = true;
1588         }
1589
1590         memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
1591
1592         LASSERTF(fid_is_sane(fid), "fid for osp_object %px is insane"DFID"!\n",
1593                  o, PFID(fid));
1594
1595         if (replay) {
1596                 /* special case, id was assigned outside of transaction
1597                  * see comments in osp_declare_attr_set */
1598                 LASSERT(d->opd_pre != NULL);
1599                 spin_lock(&d->opd_pre_lock);
1600                 osp_update_last_fid(d, fid, true);
1601                 spin_unlock(&d->opd_pre_lock);
1602         }
1603
1604         CDEBUG(D_INODE, "fid for osp_object %p is "DFID"\n", o, PFID(fid));
1605
1606         /* If the precreate ends, it means it will be ready to rollover to
1607          * the new sequence soon, all the creation should be synchronized,
1608          * otherwise during replay, the replay fid will be inconsistent with
1609          * last_used/create fid */
1610         if (osp_precreate_end_seq(d) && osp_is_fid_client(d))
1611                 th->th_sync = 1;
1612
1613         local_th = osp_get_storage_thandle(env, th, d);
1614         if (IS_ERR(local_th))
1615                 RETURN(PTR_ERR(local_th));
1616         /*
1617          * it's OK if the import is inactive by this moment - id was created
1618          * by OST earlier, we just need to maintain it consistently on the disk
1619          * once import is reconnected, OSP will claim this and other objects
1620          * used and OST either keep them, if they exist or recreate
1621          */
1622
1623         /* we might have lost precreated objects */
1624         if (unlikely(d->opd_gap_count) > 0) {
1625                 LASSERT(d->opd_pre != NULL);
1626                 spin_lock(&d->opd_pre_lock);
1627                 if (d->opd_gap_count > 0) {
1628                         int count = d->opd_gap_count;
1629
1630                         rc = ostid_set_id(&osi->osi_oi,
1631                                           fid_oid(&d->opd_gap_start_fid));
1632                         if (rc) {
1633                                 spin_unlock(&d->opd_pre_lock);
1634                                 RETURN(rc);
1635                         }
1636                         d->opd_gap_count = 0;
1637                         spin_unlock(&d->opd_pre_lock);
1638
1639                         CDEBUG(D_HA, "Writing gap "DFID"+%d in llog\n",
1640                                PFID(&d->opd_gap_start_fid), count);
1641                         /* real gap handling is disabled intil ORI-692 will be
1642                          * fixed, now we only report gaps */
1643                 } else {
1644                         spin_unlock(&d->opd_pre_lock);
1645                 }
1646         }
1647
1648         /* Only need update last_used oid file, seq file will only be update
1649          * during seq rollover */
1650         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
1651                            &d->opd_last_id, d->opd_index);
1652
1653         rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
1654                              &osi->osi_off, local_th);
1655
1656         CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
1657                d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
1658
1659         RETURN(rc);
1660 }
1661
1662 /**
1663  * Implement OSP layer dt_object_operations::do_declare_destroy() interface.
1664  *
1665  * Declare that the caller will destroy the specified OST object.
1666  *
1667  * The OST object destroy will be handled via llog asynchronously. This
1668  * function will declare the credits for generating MDS_UNLINK64_REC llog.
1669  *
1670  * \param[in] env       pointer to the thread context
1671  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
1672  * \param[in] th        pointer to the transaction handler
1673  *
1674  * \retval              0 for success
1675  * \retval              negative error number on failure
1676  */
1677 static int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt,
1678                                struct thandle *th)
1679 {
1680         struct osp_object       *o = dt2osp_obj(dt);
1681         struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
1682         int                      rc = 0;
1683
1684         ENTRY;
1685
1686         LASSERT(!osp->opd_connect_mdt);
1687
1688         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
1689                 rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1690
1691         RETURN(rc);
1692 }
1693
1694 /**
1695  * Implement OSP layer dt_object_operations::do_destroy() interface.
1696  *
1697  * Destroy the specified OST object.
1698  *
1699  * The OSP generates a MDS_UNLINK64_REC record in the llog. There
1700  * will be some dedicated thread to handle the llog asynchronously.
1701  *
1702  * It also marks the object as non-cached.
1703  *
1704  * \param[in] env       pointer to the thread context
1705  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
1706  * \param[in] th        pointer to the transaction handler
1707  *
1708  * \retval              0 for success
1709  * \retval              negative error number on failure
1710  */
1711 static int osp_destroy(const struct lu_env *env, struct dt_object *dt,
1712                        struct thandle *th)
1713 {
1714         struct osp_object       *o = dt2osp_obj(dt);
1715         struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
1716         int                      rc = 0;
1717
1718         ENTRY;
1719
1720         o->opo_non_exist = 1;
1721
1722         LASSERT(!osp->opd_connect_mdt);
1723
1724         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) {
1725                 /* once transaction is committed put proper command on
1726                  * the queue going to our OST. */
1727                 rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1728                 if (rc < 0)
1729                         RETURN(rc);
1730         }
1731
1732         /* not needed in cache any more */
1733         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1734
1735         RETURN(rc);
1736 }
1737
1738 static int osp_orphan_index_lookup(const struct lu_env *env,
1739                                    struct dt_object *dt,
1740                                    struct dt_rec *rec,
1741                                    const struct dt_key *key)
1742 {
1743         return -EOPNOTSUPP;
1744 }
1745
1746 static int osp_orphan_index_declare_insert(const struct lu_env *env,
1747                                            struct dt_object *dt,
1748                                            const struct dt_rec *rec,
1749                                            const struct dt_key *key,
1750                                            struct thandle *handle)
1751 {
1752         return -EOPNOTSUPP;
1753 }
1754
1755 static int osp_orphan_index_insert(const struct lu_env *env,
1756                                    struct dt_object *dt,
1757                                    const struct dt_rec *rec,
1758                                    const struct dt_key *key,
1759                                    struct thandle *handle)
1760 {
1761         return -EOPNOTSUPP;
1762 }
1763
1764 static int osp_orphan_index_declare_delete(const struct lu_env *env,
1765                                            struct dt_object *dt,
1766                                            const struct dt_key *key,
1767                                            struct thandle *handle)
1768 {
1769         return -EOPNOTSUPP;
1770 }
1771
1772 static int osp_orphan_index_delete(const struct lu_env *env,
1773                                    struct dt_object *dt,
1774                                    const struct dt_key *key,
1775                                    struct thandle *handle)
1776 {
1777         return -EOPNOTSUPP;
1778 }
1779
1780 /**
1781  * Initialize the OSP layer index iteration.
1782  *
1783  * \param[in] env       pointer to the thread context
1784  * \param[in] dt        pointer to the index object to be iterated
1785  * \param[in] attr      unused
1786  *
1787  * \retval              pointer to the iteration structure
1788  * \retval              negative error number on failure
1789  */
1790 struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
1791                           __u32 attr)
1792 {
1793         struct osp_it *it;
1794
1795         OBD_ALLOC_PTR(it);
1796         if (it == NULL)
1797                 return ERR_PTR(-ENOMEM);
1798
1799         it->ooi_pos_ent = -1;
1800         it->ooi_obj = dt;
1801         it->ooi_attr = attr;
1802
1803         return (struct dt_it *)it;
1804 }
1805
1806 /**
1807  * Finalize the OSP layer index iteration.
1808  *
1809  * \param[in] env       pointer to the thread context
1810  * \param[in] di        pointer to the iteration structure
1811  */
1812 void osp_it_fini(const struct lu_env *env, struct dt_it *di)
1813 {
1814         struct osp_it   *it = (struct osp_it *)di;
1815         struct page     **pages = it->ooi_pages;
1816         int             npages = it->ooi_total_npages;
1817         int             i;
1818
1819         if (pages != NULL) {
1820                 for (i = 0; i < npages; i++) {
1821                         if (pages[i] != NULL) {
1822                                 if (pages[i] == it->ooi_cur_page) {
1823                                         kunmap(pages[i]);
1824                                         it->ooi_cur_page = NULL;
1825                                 }
1826                                 __free_page(pages[i]);
1827                         }
1828                 }
1829                 OBD_FREE_PTR_ARRAY(pages, npages);
1830         }
1831         OBD_FREE_PTR(it);
1832 }
1833
1834 /**
1835  * Get more records for the iteration from peer.
1836  *
1837  * The new records will be filled in an array of pages. The OSP side
1838  * allows 1MB bulk data to be transferred.
1839  *
1840  * \param[in] env       pointer to the thread context
1841  * \param[in] it        pointer to the iteration structure
1842  *
1843  * \retval              0 for success
1844  * \retval              negative error number on failure
1845  */
1846 static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
1847 {
1848         struct lu_device         *dev   = it->ooi_obj->do_lu.lo_dev;
1849         struct osp_device        *osp   = lu2osp_dev(dev);
1850         struct page             **pages;
1851         struct ptlrpc_request    *req   = NULL;
1852         struct ptlrpc_bulk_desc  *desc;
1853         struct idx_info          *ii;
1854         int                       npages;
1855         int                       rc;
1856         int                       i;
1857         ENTRY;
1858
1859         /* 1MB bulk */
1860         npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20);
1861         npages /= PAGE_SIZE;
1862
1863         OBD_ALLOC_PTR_ARRAY(pages, npages);
1864         if (pages == NULL)
1865                 RETURN(-ENOMEM);
1866
1867         it->ooi_pages = pages;
1868         it->ooi_total_npages = npages;
1869         for (i = 0; i < npages; i++) {
1870                 pages[i] = alloc_page(GFP_NOFS);
1871                 if (pages[i] == NULL)
1872                         RETURN(-ENOMEM);
1873         }
1874
1875         req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import,
1876                                    &RQF_OBD_IDX_READ);
1877         if (req == NULL)
1878                 RETURN(-ENOMEM);
1879
1880         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
1881         if (rc != 0) {
1882                 ptlrpc_request_free(req);
1883                 RETURN(rc);
1884         }
1885
1886         osp_set_req_replay(osp, req);
1887         req->rq_request_portal = OUT_PORTAL;
1888         ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
1889         memset(ii, 0, sizeof(*ii));
1890         if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) {
1891                 /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE,
1892                  * ost_index, 0] with LAST_ID FID, so it needs to replace
1893                  * the FID with orphan FID here */
1894                 ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE;
1895                 ii->ii_fid.f_oid = osp->opd_index;
1896                 ii->ii_fid.f_ver = 0;
1897                 ii->ii_flags = II_FL_NOHASH;
1898                 ii->ii_attrs = osp_dev2node(osp);
1899         } else {
1900                 ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu);
1901                 ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY |
1902                                II_FL_VARREC;
1903                 ii->ii_attrs = it->ooi_attr;
1904         }
1905         ii->ii_magic = IDX_INFO_MAGIC;
1906         ii->ii_count = npages * LU_PAGE_COUNT;
1907         ii->ii_hash_start = it->ooi_next;
1908
1909         ptlrpc_at_set_req_timeout(req);
1910
1911         desc = ptlrpc_prep_bulk_imp(req, npages, 1,
1912                                     PTLRPC_BULK_PUT_SINK,
1913                                     MDS_BULK_PORTAL,
1914                                     &ptlrpc_bulk_kiov_pin_ops);
1915         if (desc == NULL)
1916                 GOTO(out, rc = -ENOMEM);
1917
1918         for (i = 0; i < npages; i++)
1919                 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
1920                                                  PAGE_SIZE);
1921
1922         ptlrpc_request_set_replen(req);
1923         rc = ptlrpc_queue_wait(req);
1924         if (rc != 0)
1925                 GOTO(out, rc);
1926
1927         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1928                                           req->rq_bulk->bd_nob_transferred);
1929         if (rc < 0)
1930                 GOTO(out, rc);
1931         rc = 0;
1932
1933         ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
1934         if (ii->ii_magic != IDX_INFO_MAGIC)
1935                  GOTO(out, rc = -EPROTO);
1936
1937         npages = (ii->ii_count + LU_PAGE_COUNT - 1) >>
1938                  (PAGE_SHIFT - LU_PAGE_SHIFT);
1939         if (npages > it->ooi_total_npages) {
1940                 CERROR("%s: returned more pages than expected, %u > %u\n",
1941                        osp->opd_obd->obd_name, npages, it->ooi_total_npages);
1942                 GOTO(out, rc = -EINVAL);
1943         }
1944
1945         it->ooi_rec_size = ii->ii_recsize;
1946         it->ooi_valid_npages = npages;
1947         if (req_capsule_rep_need_swab(&req->rq_pill))
1948                 it->ooi_swab = 1;
1949
1950         it->ooi_next = ii->ii_hash_end;
1951
1952 out:
1953         ptlrpc_req_finished(req);
1954
1955         return rc;
1956 }
1957
1958 /**
1959  * Move the iteration cursor to the next lu_page.
1960  *
1961  * One system page (PAGE_SIZE) may contain multiple lu_page (4KB),
1962  * that depends on the LU_PAGE_COUNT. If it is not the last lu_page
1963  * in current system page, then move the iteration cursor to the next
1964  * lu_page in current system page. Otherwise, if there are more system
1965  * pages in the cache, then move the iteration cursor to the next system
1966  * page. If all the cached records (pages) have been iterated, then fetch
1967  * more records via osp_it_fetch().
1968  *
1969  * \param[in] env       pointer to the thread context
1970  * \param[in] di        pointer to the iteration structure
1971  *
1972  * \retval              positive for end of the directory
1973  * \retval              0 for success
1974  * \retval              negative error number on failure
1975  */
1976 int osp_it_next_page(const struct lu_env *env, struct dt_it *di)
1977 {
1978         struct osp_it           *it = (struct osp_it *)di;
1979         struct lu_idxpage       *idxpage;
1980         struct page             **pages;
1981         int                     rc;
1982         int                     i;
1983         ENTRY;
1984
1985 process_idxpage:
1986         idxpage = it->ooi_cur_idxpage;
1987         if (idxpage != NULL) {
1988                 if (idxpage->lip_nr == 0)
1989                         goto finish_cur_idxpage;
1990
1991                 if (it->ooi_pos_ent < idxpage->lip_nr) {
1992                         CDEBUG(D_INFO, "ooi_pos %d nr %d\n",
1993                                (int)it->ooi_pos_ent, (int)idxpage->lip_nr);
1994                         RETURN(0);
1995                 }
1996 finish_cur_idxpage:
1997                 it->ooi_cur_idxpage = NULL;
1998                 it->ooi_pos_lu_page++;
1999
2000 process_page:
2001                 if (it->ooi_pos_lu_page < LU_PAGE_COUNT) {
2002                         it->ooi_cur_idxpage = (void *)it->ooi_cur_page +
2003                                          LU_PAGE_SIZE * it->ooi_pos_lu_page;
2004                         if (it->ooi_swab)
2005                                 lustre_swab_lip_header(it->ooi_cur_idxpage);
2006                         if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) {
2007                                 struct osp_device *osp =
2008                                         lu2osp_dev(it->ooi_obj->do_lu.lo_dev);
2009
2010                                 CERROR("%s: invalid magic (%x != %x) for page "
2011                                        "%d/%d while read layout orphan index\n",
2012                                        osp->opd_obd->obd_name,
2013                                        it->ooi_cur_idxpage->lip_magic,
2014                                        LIP_MAGIC, it->ooi_pos_page,
2015                                        it->ooi_pos_lu_page);
2016                                 /* Skip this lu_page next time. */
2017                                 it->ooi_pos_ent = idxpage->lip_nr - 1;
2018                                 RETURN(-EINVAL);
2019                         }
2020                         it->ooi_pos_ent = -1;
2021                         goto process_idxpage;
2022                 }
2023
2024                 kunmap(it->ooi_cur_page);
2025                 it->ooi_cur_page = NULL;
2026                 it->ooi_pos_page++;
2027
2028 start:
2029                 pages = it->ooi_pages;
2030                 if (it->ooi_pos_page < it->ooi_valid_npages) {
2031                         it->ooi_cur_page = kmap(pages[it->ooi_pos_page]);
2032                         it->ooi_pos_lu_page = 0;
2033                         goto process_page;
2034                 }
2035
2036                 for (i = 0; i < it->ooi_total_npages; i++) {
2037                         if (pages[i] != NULL)
2038                                 __free_page(pages[i]);
2039                 }
2040                 OBD_FREE_PTR_ARRAY(pages, it->ooi_total_npages);
2041
2042                 it->ooi_pos_page = 0;
2043                 it->ooi_total_npages = 0;
2044                 it->ooi_valid_npages = 0;
2045                 it->ooi_swab = 0;
2046                 it->ooi_ent = NULL;
2047                 it->ooi_cur_page = NULL;
2048                 it->ooi_cur_idxpage = NULL;
2049                 it->ooi_pages = NULL;
2050         }
2051
2052         if (it->ooi_next == II_END_OFF)
2053                 RETURN(1);
2054
2055         rc = osp_it_fetch(env, it);
2056         if (rc == 0)
2057                 goto start;
2058
2059         RETURN(rc);
2060 }
2061
2062 /**
2063  * Move the iteration cursor to the next record.
2064  *
2065  * If there are more records in the lu_page, then move the iteration
2066  * cursor to the next record directly. Otherwise, move the iteration
2067  * cursor to the record in the next lu_page via osp_it_next_page()
2068  *
2069  * \param[in] env       pointer to the thread context
2070  * \param[in] di        pointer to the iteration structure
2071  *
2072  * \retval              positive for end of the directory
2073  * \retval              0 for success
2074  * \retval              negative error number on failure
2075  */
2076 static int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di)
2077 {
2078         struct osp_it           *it = (struct osp_it *)di;
2079         struct lu_idxpage       *idxpage;
2080         int                     rc;
2081         ENTRY;
2082
2083 again:
2084         idxpage = it->ooi_cur_idxpage;
2085         if (idxpage != NULL) {
2086                 if (idxpage->lip_nr == 0)
2087                         RETURN(1);
2088
2089                 it->ooi_pos_ent++;
2090                 if (it->ooi_pos_ent < idxpage->lip_nr) {
2091                         if (it->ooi_rec_size ==
2092                                         sizeof(struct lu_orphan_rec_v3)) {
2093                                 it->ooi_ent =
2094                                 (struct lu_orphan_ent_v3 *)idxpage->lip_entries+
2095                                                         it->ooi_pos_ent;
2096                                 if (it->ooi_swab)
2097                                         lustre_swab_orphan_ent_v3(it->ooi_ent);
2098                         } else if (it->ooi_rec_size ==
2099                                         sizeof(struct lu_orphan_rec_v2)) {
2100                                 it->ooi_ent =
2101                                 (struct lu_orphan_ent_v2 *)idxpage->lip_entries+
2102                                                         it->ooi_pos_ent;
2103                                 if (it->ooi_swab)
2104                                         lustre_swab_orphan_ent_v2(it->ooi_ent);
2105                         } else {
2106                                 it->ooi_ent =
2107                                 (struct lu_orphan_ent *)idxpage->lip_entries +
2108                                                         it->ooi_pos_ent;
2109                                 if (it->ooi_swab)
2110                                         lustre_swab_orphan_ent(it->ooi_ent);
2111                         }
2112                         RETURN(0);
2113                 }
2114         }
2115
2116         rc = osp_it_next_page(env, di);
2117         if (rc == 0)
2118                 goto again;
2119
2120         RETURN(rc);
2121 }
2122
2123 int osp_it_get(const struct lu_env *env, struct dt_it *di,
2124                const struct dt_key *key)
2125 {
2126         return 1;
2127 }
2128
2129 void osp_it_put(const struct lu_env *env, struct dt_it *di)
2130 {
2131 }
2132
2133 static struct dt_key *osp_orphan_it_key(const struct lu_env *env,
2134                                         const struct dt_it *di)
2135 {
2136         struct osp_it   *it  = (struct osp_it *)di;
2137         struct lu_orphan_ent    *ent = (struct lu_orphan_ent *)it->ooi_ent;
2138
2139         if (likely(ent != NULL))
2140                 return (struct dt_key *)(&ent->loe_key);
2141
2142         return NULL;
2143 }
2144
2145 static int osp_orphan_it_key_size(const struct lu_env *env,
2146                                   const struct dt_it *di)
2147 {
2148         return sizeof(struct lu_fid);
2149 }
2150
2151 static int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di,
2152                              struct dt_rec *rec, __u32 attr)
2153 {
2154         struct osp_it *it = (struct osp_it *)di;
2155
2156         if (likely(it->ooi_ent)) {
2157                 if (it->ooi_rec_size == sizeof(struct lu_orphan_rec_v3)) {
2158                         struct lu_orphan_ent_v3 *ent =
2159                                 (struct lu_orphan_ent_v3 *)it->ooi_ent;
2160
2161                         *(struct lu_orphan_rec_v3 *)rec = ent->loe_rec;
2162                 } else if (it->ooi_rec_size ==
2163                                 sizeof(struct lu_orphan_rec_v2)) {
2164                         struct lu_orphan_ent_v2 *ent =
2165                                 (struct lu_orphan_ent_v2 *)it->ooi_ent;
2166
2167                         *(struct lu_orphan_rec_v2 *)rec = ent->loe_rec;
2168                 } else {
2169                         struct lu_orphan_ent *ent =
2170                                 (struct lu_orphan_ent *)it->ooi_ent;
2171
2172                         *(struct lu_orphan_rec *)rec = ent->loe_rec;
2173                 }
2174                 return 0;
2175         }
2176
2177         return -EINVAL;
2178 }
2179
2180 __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
2181 {
2182         struct osp_it   *it = (struct osp_it *)di;
2183
2184         return it->ooi_next;
2185 }
2186
2187 /**
2188  * Locate the iteration cursor to the specified position (cookie).
2189  *
2190  * \param[in] env       pointer to the thread context
2191  * \param[in] di        pointer to the iteration structure
2192  * \param[in] hash      the specified position
2193  *
2194  * \retval              positive number for locating to the exactly position
2195  *                      or the next
2196  * \retval              0 for arriving at the end of the iteration
2197  * \retval              negative error number on failure
2198  */
2199 static int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
2200                               __u64 hash)
2201 {
2202         struct osp_it *it = (struct osp_it *)di;
2203         int rc;
2204
2205         it->ooi_next = hash;
2206         rc = osp_orphan_it_next(env, (struct dt_it *)di);
2207         if (rc == 1)
2208                 return 0;
2209
2210         if (rc == 0)
2211                 return 1;
2212
2213         return rc;
2214 }
2215
2216 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
2217                    void *key_rec)
2218 {
2219         return 0;
2220 }
2221
2222 static const struct dt_index_operations osp_orphan_index_ops = {
2223         .dio_lookup             = osp_orphan_index_lookup,
2224         .dio_declare_insert     = osp_orphan_index_declare_insert,
2225         .dio_insert             = osp_orphan_index_insert,
2226         .dio_declare_delete     = osp_orphan_index_declare_delete,
2227         .dio_delete             = osp_orphan_index_delete,
2228         .dio_it = {
2229                 .init           = osp_it_init,
2230                 .fini           = osp_it_fini,
2231                 .next           = osp_orphan_it_next,
2232                 .get            = osp_it_get,
2233                 .put            = osp_it_put,
2234                 .key            = osp_orphan_it_key,
2235                 .key_size       = osp_orphan_it_key_size,
2236                 .rec            = osp_orphan_it_rec,
2237                 .store          = osp_it_store,
2238                 .load           = osp_orphan_it_load,
2239                 .key_rec        = osp_it_key_rec,
2240         }
2241 };
2242
2243 /**
2244  * Implement OSP layer dt_object_operations::do_index_try() interface.
2245  *
2246  * Negotiate the index type.
2247  *
2248  * If the target index is an IDIF object, then use osp_orphan_index_ops.
2249  * Otherwise, assign osp_md_index_ops to the dt_object::do_index_ops.
2250  * (\see lustre/include/lustre_fid.h for IDIF.)
2251  *
2252  * \param[in] env       pointer to the thread context
2253  * \param[in] dt        pointer to the OSP layer dt_object
2254  * \param[in] feat      unused
2255  *
2256  * \retval              0 for success
2257  */
2258 static int osp_index_try(const struct lu_env *env,
2259                          struct dt_object *dt,
2260                          const struct dt_index_features *feat)
2261 {
2262         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2263
2264         if (fid_is_last_id(fid) && fid_is_idif(fid))
2265                 dt->do_index_ops = &osp_orphan_index_ops;
2266         else
2267                 dt->do_index_ops = &osp_md_index_ops;
2268         return 0;
2269 }
2270
2271 static const struct dt_object_operations osp_obj_ops = {
2272         .do_declare_attr_get    = osp_declare_attr_get,
2273         .do_attr_get            = osp_attr_get,
2274         .do_declare_attr_set    = osp_declare_attr_set,
2275         .do_attr_set            = osp_attr_set,
2276         .do_declare_xattr_get   = osp_declare_xattr_get,
2277         .do_xattr_get           = osp_xattr_get,
2278         .do_declare_xattr_set   = osp_declare_xattr_set,
2279         .do_xattr_set           = osp_xattr_set,
2280         .do_declare_create      = osp_declare_create,
2281         .do_create              = osp_create,
2282         .do_declare_destroy     = osp_declare_destroy,
2283         .do_destroy             = osp_destroy,
2284         .do_index_try           = osp_index_try,
2285 };
2286
2287 /**
2288  * Implement OSP layer lu_object_operations::loo_object_init() interface.
2289  *
2290  * Initialize the object.
2291  *
2292  * If it is a remote MDT object, then call do_attr_get() to fetch
2293  * the attribute from the peer.
2294  *
2295  * \param[in] env       pointer to the thread context
2296  * \param[in] o         pointer to the OSP layer lu_object
2297  * \param[in] conf      unused
2298  *
2299  * \retval              0 for success
2300  * \retval              negative error number on failure
2301  */
2302 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
2303                            const struct lu_object_conf *conf)
2304 {
2305         struct osp_object *po = lu2osp_obj(o);
2306         int rc = 0;
2307
2308         ENTRY;
2309
2310         o->lo_header->loh_attr |= LOHA_REMOTE;
2311
2312         if (is_ost_obj(o)) {
2313                 po->opo_obj.do_ops = &osp_obj_ops;
2314         } else {
2315                 struct lu_attr *la = &osp_env_info(env)->osi_attr;
2316
2317                 po->opo_obj.do_ops = &osp_md_obj_ops;
2318                 po->opo_obj.do_body_ops = &osp_md_body_ops;
2319
2320                 if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
2321                         po->opo_non_exist = 1;
2322                 } else {
2323                         rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
2324                                                              la);
2325                         if (rc == 0)
2326                                 o->lo_header->loh_attr |=
2327                                         LOHA_EXISTS | (la->la_mode & S_IFMT);
2328                         if (rc == -ENOENT) {
2329                                 po->opo_non_exist = 1;
2330                                 rc = 0;
2331                         }
2332                 }
2333         }
2334
2335         RETURN(rc);
2336 }
2337
2338 static void osp_object_free_rcu(struct rcu_head *head)
2339 {
2340         struct osp_object *obj = container_of(head, struct osp_object,
2341                                               opo_header.loh_rcu);
2342
2343         kmem_cache_free(osp_object_kmem, obj);
2344 }
2345
2346 /**
2347  * Implement OSP layer lu_object_operations::loo_object_free() interface.
2348  *
2349  * Finalize the object.
2350  *
2351  * If the OSP object has attributes cache, then destroy the cache.
2352  * Free the object finally.
2353  *
2354  * \param[in] env       pointer to the thread context
2355  * \param[in] o         pointer to the OSP layer lu_object
2356  */
2357 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
2358 {
2359         struct osp_object       *obj = lu2osp_obj(o);
2360         struct lu_object_header *h = o->lo_header;
2361         struct osp_xattr_entry *oxe;
2362         struct osp_xattr_entry *tmp;
2363         int                     count;
2364
2365         dt_object_fini(&obj->opo_obj);
2366         if (h)
2367                 lu_object_header_fini(h);
2368         list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
2369                 list_del_init(&oxe->oxe_list);
2370                 count = atomic_read(&oxe->oxe_ref);
2371                 LASSERTF(count == 1,
2372                          "Still has %d users on the xattr entry %.*s\n",
2373                          count-1, (int)oxe->oxe_namelen, oxe->oxe_name);
2374
2375                 osp_oac_xattr_free(oxe);
2376         }
2377         OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed");
2378         call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu);
2379 }
2380
2381 /**
2382  * Implement OSP layer lu_object_operations::loo_object_release() interface.
2383  *
2384  * Cleanup (not free) the object.
2385  *
2386  * If it is a reserved object but failed to be created, or it is an OST
2387  * object, then mark the object as non-cached.
2388  *
2389  * \param[in] env       pointer to the thread context
2390  * \param[in] o         pointer to the OSP layer lu_object
2391  */
2392 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
2393 {
2394         struct osp_object       *po = lu2osp_obj(o);
2395         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
2396
2397         ENTRY;
2398
2399         /*
2400          * release reservation if object was declared but not created
2401          * this may require lu_object_put() in LOD
2402          */
2403         if (unlikely(po->opo_reserved)) {
2404                 LASSERT(d->opd_pre != NULL);
2405                 LASSERT(d->opd_pre_reserved > 0);
2406                 spin_lock(&d->opd_pre_lock);
2407                 d->opd_pre_reserved--;
2408                 spin_unlock(&d->opd_pre_lock);
2409
2410                 /*
2411                  * Check that osp_precreate_cleanup_orphans is not blocked
2412                  * due to opd_pre_reserved > 0.
2413                  */
2414                 if (unlikely(d->opd_pre_reserved == 0 &&
2415                              (d->opd_pre_recovering || d->opd_pre_status)))
2416                         wake_up(&d->opd_pre_waitq);
2417
2418                 /* not needed in cache any more */
2419                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
2420         }
2421
2422         if (is_ost_obj(o))
2423                 /* XXX: Currently, NOT cache OST-object on MDT because:
2424                  *      1. it is not often accessed on MDT.
2425                  *      2. avoid up layer (such as LFSCK) to load too many
2426                  *         once-used OST-objects. */
2427                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
2428
2429         EXIT;
2430 }
2431
2432 static int osp_object_print(const struct lu_env *env, void *cookie,
2433                             lu_printer_t p, const struct lu_object *l)
2434 {
2435         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
2436
2437         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
2438 }
2439
2440 static int osp_object_invariant(const struct lu_object *o)
2441 {
2442         LBUG();
2443 }
2444
2445 const struct lu_object_operations osp_lu_obj_ops = {
2446         .loo_object_init        = osp_object_init,
2447         .loo_object_free        = osp_object_free,
2448         .loo_object_release     = osp_object_release,
2449         .loo_object_print       = osp_object_print,
2450         .loo_object_invariant   = osp_object_invariant
2451 };