Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osp / osp_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * lustre/osp/osp_object.c
30  *
31  * Lustre OST Proxy Device (OSP) is the agent on the local MDT for the OST
32  * or remote MDT.
33  *
34  * OSP object attributes cache
35  * ---------------------------
36  * OSP object is the stub of the remote OST-object or MDT-object. Both the
37  * attribute and the extended attributes are stored on the peer side remotely.
38  * It is inefficient to send RPC to peer to fetch those attributes when every
39  * get_attr()/get_xattr() called. For a large system, the LFSCK synchronous
40  * mode scanning is prohibitively inefficient.
41  *
42  * The OSP maintains the OSP object attributes cache to cache some
43  * attributes on the local MDT.
44  *
45  * The basic attributes, such as owner/mode/flags, are stored in the
46  * osp_object::opo_attr. The extended attributes will be stored
47  * as osp_xattr_entry. Every extended attribute has an independent
48  * osp_xattr_entry, and all the osp_xattr_entry are linked into the
49  * osp_object::opo_xattr_list. The OSP object attributes cache
50  * is protected by the osp_object::opo_lock.
51  *
52  * Not all OSP objects have an attributes cache because maintaining
53  * the cache requires some resources. Currently, the OSP object
54  * attributes cache will be initialized when the attributes or the
55  * extended attributes are pre-fetched via osp_declare_attr_get()
56  * or osp_declare_xattr_get(). That is usually for LFSCK purpose,
57  * but it also can be shared by others.
58  *
59  *
60  * XXX: NOT prepare out RPC for remote transaction. ((please refer to the
61  *      comment of osp_trans_create() for remote transaction)
62  *
63  * According to our current transaction/dt_object_lock framework (to make
64  * the cross-MDTs modification for DNE1 to be workable), the transaction
65  * sponsor will start the transaction firstly, then try to acquire related
66  * dt_object_lock if needed. Under such rules, if we want to prepare the
67  * OUT RPC in the transaction declare phase, then related attr/xattr
68  * should be known without dt_object_lock. But such condition maybe not
69  * true for some remote transaction case. For example:
70  *
71  * For linkEA repairing (by LFSCK) case, before the LFSCK thread obtained
72  * the dt_object_lock on the target MDT-object, it cannot know whether
73  * the MDT-object has linkEA or not, neither invalid or not.
74  *
75  * Since the LFSCK thread cannot hold dt_object_lock before the remote
76  * transaction start (otherwise there will be some potential deadlock),
77  * it cannot prepare related OUT RPC for repairing during the declare
78  * phase as other normal transactions do.
79  *
80  * To resolve the trouble, we will make OSP to prepare related OUT RPC
81  * after remote transaction started, and trigger the remote updating
82  * (send RPC) when trans_stop. Then the up layer users, such as LFSCK,
83  * can follow the general rule to handle trans_start/dt_object_lock
84  * for repairing linkEA inconsistency without distinguishing remote
85  * MDT-object.
86  *
87  * In fact, above solution for remote transaction should be the normal
88  * model without considering DNE1. The trouble brought by DNE1 will be
89  * resolved in DNE2. At that time, this patch can be removed.
90  *
91  *
92  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
93  * Author: Mikhail Pershin <mike.tappro@intel.com>
94  */
95
96 #define DEBUG_SUBSYSTEM S_MDS
97
98 #include <lustre_obdo.h>
99 #include <lustre_swab.h>
100
101 #include "osp_internal.h"
102
103 static inline __u32 osp_dev2node(struct osp_device *osp)
104 {
105         return osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
106 }
107
108 static inline const char *osp_dto2name(struct osp_object *obj)
109 {
110         return obj->opo_obj.do_lu.lo_dev->ld_obd->obd_name;
111 }
112
113 static inline bool is_ost_obj(struct lu_object *lo)
114 {
115         return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
116 }
117
118 static inline void __osp_oac_xattr_assignment(struct osp_object *obj,
119                                               struct osp_xattr_entry *oxe,
120                                               const struct lu_buf *buf)
121 {
122         if (buf->lb_len > 0)
123                 memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
124
125         oxe->oxe_vallen = buf->lb_len;
126         oxe->oxe_exist = 1;
127         oxe->oxe_ready = 1;
128 }
129
130 /**
131  * Assign FID to the OST object.
132  *
133  * This function will assign the FID to the OST object of a striped file.
134  *
135  * \param[in] env       pointer to the thread context
136  * \param[in] d         pointer to the OSP device
137  * \param[in] o         pointer to the OSP object that the FID will be
138  *                      assigned to
139  */
140 static void osp_object_assign_fid(const struct lu_env *env,
141                                   struct osp_device *d, struct osp_object *o)
142 {
143         struct osp_thread_info *osi = osp_env_info(env);
144
145         LASSERT(fid_is_zero(lu_object_fid(&o->opo_obj.do_lu)));
146         LASSERT(o->opo_reserved);
147         o->opo_reserved = 0;
148
149         osp_precreate_get_fid(env, d, &osi->osi_fid);
150
151         lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
152 }
153
154 #define OXE_DEFAULT_LEN 16
155
156 /**
157  * Allocate osp_xattr_entry.
158  *
159  * If total size exceeds PAGE_SIZE, name and value will allocated in a
160  * separate buf, otherwise it's allocated inline.
161  *
162  * \param[in] name      pointer to XATTR name
163  * \param[in] namelen   XATTR name len
164  * \param[in] vallen    XATTR value len
165  * \retval              oxe pointer on success
166  * \retval              NULL on failure
167  */
168 static struct osp_xattr_entry *osp_oac_xattr_alloc(const char *name,
169                                                    size_t namelen,
170                                                    size_t vallen)
171 {
172         struct osp_xattr_entry *oxe;
173         size_t size;
174
175         if (!vallen)
176                 vallen = OXE_DEFAULT_LEN;
177         size = sizeof(*oxe) + namelen + 1 + vallen;
178         if (likely(size <= PAGE_SIZE)) {
179                 OBD_ALLOC(oxe, size);
180                 if (unlikely(!oxe))
181                         return NULL;
182                 oxe->oxe_buflen = size;
183                 oxe->oxe_value = oxe->oxe_name + namelen + 1;
184         } else {
185                 char *buf;
186
187                 OBD_ALLOC_LARGE(buf, vallen);
188                 if (unlikely(!buf))
189                         return NULL;
190
191                 size -= vallen;
192                 OBD_ALLOC(oxe, size);
193                 if (unlikely(!oxe)) {
194                         OBD_FREE(buf, size);
195                         return NULL;
196                 }
197                 oxe->oxe_buflen = vallen;
198                 oxe->oxe_value = buf;
199                 oxe->oxe_largebuf = 1;
200         }
201
202         INIT_LIST_HEAD(&oxe->oxe_list);
203
204         oxe->oxe_namelen = namelen;
205         memcpy(oxe->oxe_name, name, namelen);
206         /* One ref is for the caller, the other is for the entry on the list. */
207         atomic_set(&oxe->oxe_ref, 2);
208
209         return oxe;
210 }
211
212 static void osp_oac_xattr_free(struct osp_xattr_entry *oxe)
213 {
214         LASSERT(list_empty(&oxe->oxe_list));
215         if (unlikely(oxe->oxe_largebuf)) {
216                 OBD_FREE_LARGE(oxe->oxe_value, oxe->oxe_buflen);
217                 OBD_FREE(oxe, sizeof(*oxe) + oxe->oxe_namelen + 1);
218         } else {
219                 OBD_FREE(oxe, oxe->oxe_buflen);
220         }
221 }
222
223 /**
224  * Release reference from the OSP object extended attribute entry.
225  *
226  * If it is the last reference, then free the entry.
227  *
228  * \param[in] oxe       pointer to the OSP object extended attribute entry.
229  */
230 static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
231 {
232         if (atomic_dec_and_test(&oxe->oxe_ref))
233                 osp_oac_xattr_free(oxe);
234 }
235
236 /**
237  * Find the named extended attribute in the OSP object attributes cache.
238  *
239  * The caller should take the osp_object::opo_lock before calling
240  * this function.
241  *
242  * \param[in] obj       pointer to the OSP object
243  * \param[in] name      the name of the extended attribute
244  * \param[in] namelen   the name length of the extended attribute
245  *
246  * \retval              pointer to the found extended attribute entry
247  * \retval              NULL if the specified extended attribute is not
248  *                      in the cache
249  */
250 static struct osp_xattr_entry *
251 osp_oac_xattr_find_locked(struct osp_object *obj, const char *name,
252                           size_t namelen)
253 {
254         struct osp_xattr_entry *oxe;
255
256         list_for_each_entry(oxe, &obj->opo_xattr_list, oxe_list) {
257                 if (namelen == oxe->oxe_namelen &&
258                     strncmp(name, oxe->oxe_name, namelen) == 0)
259                         return oxe;
260         }
261
262         return NULL;
263 }
264
265 /**
266  * Find the named extended attribute in the OSP object attributes cache.
267  *
268  * Call osp_oac_xattr_find_locked() with the osp_object::opo_lock held.
269  *
270  * \param[in] obj       pointer to the OSP object
271  * \param[in] name      the name of the extended attribute
272  * \param[in] unlink    true if the extended attribute entry is to be removed
273  *                      from the cache
274  *
275  * \retval              pointer to the found extended attribute entry
276  * \retval              NULL if the specified extended attribute is not
277  *                      in the cache
278  */
279 static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
280                                                   const char *name, bool unlink)
281 {
282         struct osp_xattr_entry *oxe = NULL;
283
284         spin_lock(&obj->opo_lock);
285         oxe = osp_oac_xattr_find_locked(obj, name, strlen(name));
286         if (oxe) {
287                 if (unlink)
288                         list_del_init(&oxe->oxe_list);
289                 else
290                         atomic_inc(&oxe->oxe_ref);
291         }
292         spin_unlock(&obj->opo_lock);
293
294         return oxe;
295 }
296
297 /**
298  * Find the named extended attribute in the OSP object attributes cache.
299  *
300  * If it is not in the cache, then add an empty entry (that will be
301  * filled later) to cache with the given name.
302  *
303  * \param[in] obj       pointer to the OSP object
304  * \param[in] name      the name of the extended attribute
305  * \param[in] len       the length of the extended attribute value
306  *
307  * \retval              pointer to the found or new-created extended
308  *                      attribute entry
309  * \retval              NULL if the specified extended attribute is not in the
310  *                      cache or fail to add new empty entry to the cache.
311  */
312 static struct osp_xattr_entry *
313 osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
314 {
315         struct osp_xattr_entry *oxe;
316         struct osp_xattr_entry *tmp = NULL;
317         size_t namelen = strlen(name);
318
319         oxe = osp_oac_xattr_find(obj, name, false);
320         if (oxe)
321                 return oxe;
322
323         oxe = osp_oac_xattr_alloc(name, namelen, len);
324         if (!oxe)
325                 return NULL;
326
327         spin_lock(&obj->opo_lock);
328         tmp = osp_oac_xattr_find_locked(obj, name, namelen);
329         if (!tmp)
330                 list_add_tail(&oxe->oxe_list, &obj->opo_xattr_list);
331         else
332                 atomic_inc(&tmp->oxe_ref);
333         spin_unlock(&obj->opo_lock);
334
335         if (tmp) {
336                 osp_oac_xattr_free(oxe);
337                 oxe = tmp;
338         }
339
340         return oxe;
341 }
342
343 /**
344  * Check whether \a oxe is large enough to hold the xattr value
345  *
346  * \param[in] oxe       pointer to the OSP object attributes cache xattr entry
347  * \param[in] len       xattr value size in bytes
348  *
349  * \retval              true if xattr can fit in \a oxe
350  * \retval              false if xattr can not fit in \a oxe
351  */
352 static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len)
353 {
354         if (unlikely(oxe->oxe_largebuf))
355                 return oxe->oxe_buflen >= len;
356
357         return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) >= len;
358 }
359
360 /**
361  * Assign the cached OST-object's EA with the given value.
362  *
363  * If the current EA entry in cache has not enough space to hold the new
364  * value, remove it, create a new one, then assign with the given value.
365  *
366  * \param[in] obj       pointer to the OSP object
367  * \param[in] oxe       pointer to the cached EA entry to be assigned
368  * \param[in] buf       pointer to the buffer with new EA value
369  *
370  * \retval              pointer to the new created EA entry in cache if
371  *                      current entry is not big enough; otherwise, the
372  *                      input 'oxe' will be returned.
373  */
374 static struct osp_xattr_entry *
375 osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe,
376                          const struct lu_buf *buf)
377 {
378         struct osp_xattr_entry *new = NULL;
379         struct osp_xattr_entry *old = NULL;
380         int namelen = oxe->oxe_namelen;
381         bool unlink_only = false;
382
383         if (!oxe_can_hold(oxe, buf->lb_len)) {
384                 new = osp_oac_xattr_alloc(oxe->oxe_name, namelen, buf->lb_len);
385                 if (likely(new)) {
386                         __osp_oac_xattr_assignment(obj, new, buf);
387                 } else {
388                         unlink_only = true;
389                         CWARN("%s: cannot update cached xattr %.*s of "DFID"\n",
390                               osp_dto2name(obj), namelen, oxe->oxe_name,
391                               PFID(lu_object_fid(&obj->opo_obj.do_lu)));
392                 }
393         }
394
395         spin_lock(&obj->opo_lock);
396         old = osp_oac_xattr_find_locked(obj, oxe->oxe_name, namelen);
397         if (likely(old)) {
398                 if (new) {
399                         /* Unlink the 'old'. */
400                         list_del_init(&old->oxe_list);
401
402                         /* Drop the ref for 'old' on list. */
403                         osp_oac_xattr_put(old);
404
405                         /* Drop the ref for current using. */
406                         osp_oac_xattr_put(oxe);
407                         oxe = new;
408
409                         /* Insert 'new' into list. */
410                         list_add_tail(&new->oxe_list, &obj->opo_xattr_list);
411                 } else if (unlink_only) {
412                         /* Unlink the 'old'. */
413                         list_del_init(&old->oxe_list);
414
415                         /* Drop the ref for 'old' on list. */
416                         osp_oac_xattr_put(old);
417                 } else {
418                         __osp_oac_xattr_assignment(obj, oxe, buf);
419                 }
420         } else if (new) {
421                 /* Drop the ref for current using. */
422                 osp_oac_xattr_put(oxe);
423                 oxe = new;
424
425                 /* Someone unlinked the 'old' by race,
426                  * insert the 'new' one into list. */
427                 list_add_tail(&new->oxe_list, &obj->opo_xattr_list);
428         }
429         spin_unlock(&obj->opo_lock);
430
431         return oxe;
432 }
433
434 /**
435  * Parse the OSP object attribute from the RPC reply.
436  *
437  * If the attribute is valid, then it will be added to the OSP object
438  * attributes cache.
439  *
440  * \param[in] env       pointer to the thread context
441  * \param[in] reply     pointer to the RPC reply
442  * \param[in] req       pointer to the RPC request
443  * \param[out] attr     pointer to buffer to hold the output attribute
444  * \param[in] obj       pointer to the OSP object
445  * \param[in] index     the index of the attribute buffer in the reply
446  *
447  * \retval              0 for success
448  * \retval              negative error number on failure
449  */
450 static int osp_get_attr_from_reply(const struct lu_env *env,
451                                    struct object_update_reply *reply,
452                                    struct ptlrpc_request *req,
453                                    struct lu_attr *attr,
454                                    struct osp_object *obj, int index)
455 {
456         struct osp_thread_info  *osi    = osp_env_info(env);
457         struct lu_buf           *rbuf   = &osi->osi_lb2;
458         struct obdo             *lobdo  = &osi->osi_obdo;
459         struct obdo             *wobdo;
460         int                     rc;
461
462         rc = object_update_result_data_get(reply, rbuf, index);
463         if (rc < 0)
464                 return rc;
465
466         wobdo = rbuf->lb_buf;
467         if (rbuf->lb_len != sizeof(*wobdo))
468                 return -EPROTO;
469
470         LASSERT(req != NULL);
471         if (req_capsule_req_need_swab(&req->rq_pill))
472                 lustre_swab_obdo(wobdo);
473
474         lustre_get_wire_obdo(NULL, lobdo, wobdo);
475         if (obj) {
476                 spin_lock(&obj->opo_lock);
477                 la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid);
478                 spin_unlock(&obj->opo_lock);
479         }
480         if (attr)
481                 la_from_obdo(attr, lobdo, lobdo->o_valid);
482
483         return 0;
484 }
485
486 /**
487  * Interpreter function for getting OSP object attribute asynchronously.
488  *
489  * Called to interpret the result of an async mode RPC for getting the
490  * OSP object attribute.
491  *
492  * \param[in] env       pointer to the thread context
493  * \param[in] reply     pointer to the RPC reply
494  * \param[in] req       pointer to the RPC request
495  * \param[in] obj       pointer to the OSP object
496  * \param[out] data     pointer to buffer to hold the output attribute
497  * \param[in] index     the index of the attribute buffer in the reply
498  * \param[in] rc        the result for handling the RPC
499  *
500  * \retval              0 for success
501  * \retval              negative error number on failure
502  */
503 static int osp_attr_get_interpterer(const struct lu_env *env,
504                                     struct object_update_reply *reply,
505                                     struct ptlrpc_request *req,
506                                     struct osp_object *obj,
507                                     void *data, int index, int rc)
508 {
509         struct lu_attr *attr = data;
510
511         if (rc == 0) {
512                 osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
513                 obj->opo_non_exist = 0;
514
515                 return osp_get_attr_from_reply(env, reply, req, NULL, obj,
516                                                index);
517         } else {
518                 if (rc == -ENOENT) {
519                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
520                         obj->opo_non_exist = 1;
521                 }
522
523                 spin_lock(&obj->opo_lock);
524                 attr->la_valid = 0;
525                 spin_unlock(&obj->opo_lock);
526         }
527
528         return 0;
529 }
530
531 /**
532  * Implement OSP layer dt_object_operations::do_declare_attr_get() interface.
533  *
534  * Declare that the caller will get attribute from the specified OST object.
535  *
536  * This function adds an Object Unified Target (OUT) sub-request to the per-OSP
537  * based shared asynchronous request queue. The osp_attr_get_interpterer()
538  * is registered as the interpreter function to handle the result of this
539  * sub-request.
540  *
541  * \param[in] env       pointer to the thread context
542  * \param[in] dt        pointer to the OSP layer dt_object
543  *
544  * \retval              0 for success
545  * \retval              negative error number on failure
546  */
547 static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
548 {
549         struct osp_object       *obj    = dt2osp_obj(dt);
550         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
551         int                      rc     = 0;
552
553         mutex_lock(&osp->opd_async_requests_mutex);
554         rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
555                                       &obj->opo_attr, sizeof(struct obdo),
556                                       osp_attr_get_interpterer);
557         mutex_unlock(&osp->opd_async_requests_mutex);
558
559         return rc;
560 }
561
562 /**
563  * Implement OSP layer dt_object_operations::do_attr_get() interface.
564  *
565  * Get attribute from the specified MDT/OST object.
566  *
567  * If the attribute is in the OSP object attributes cache, then return
568  * the cached attribute directly. Otherwise it will trigger an OUT RPC
569  * to the peer to get the attribute synchronously, if successful, add it
570  * to the OSP attributes cache. (\see lustre/osp/osp_trans.c for OUT RPC.)
571  *
572  * \param[in] env       pointer to the thread context
573  * \param[in] dt        pointer to the OSP layer dt_object
574  * \param[out] attr     pointer to the buffer to hold the output attribute
575  *
576  * \retval              0 for success
577  * \retval              negative error number on failure
578  */
579 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
580                  struct lu_attr *attr)
581 {
582         struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
583         struct osp_object               *obj = dt2osp_obj(dt);
584         struct dt_device                *dev = &osp->opd_dt_dev;
585         struct osp_update_request       *update;
586         struct object_update_reply      *reply;
587         struct ptlrpc_request           *req = NULL;
588         int                             invalidated, cache = 0, rc = 0;
589         ENTRY;
590
591         if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
592                 RETURN(-ENOENT);
593         if (obj->opo_destroyed)
594                 RETURN(-ENOENT);
595
596         spin_lock(&obj->opo_lock);
597         if (obj->opo_attr.la_valid != 0 && !obj->opo_stale) {
598                 *attr = obj->opo_attr;
599                 spin_unlock(&obj->opo_lock);
600
601                 RETURN(0);
602         }
603         spin_unlock(&obj->opo_lock);
604
605         update = osp_update_request_create(dev);
606         if (IS_ERR(update))
607                 RETURN(PTR_ERR(update));
608
609         rc = OSP_UPDATE_RPC_PACK(env, out_attr_get_pack, update,
610                                  lu_object_fid(&dt->do_lu));
611         if (rc != 0) {
612                 CERROR("%s: Insert update error "DFID": rc = %d\n",
613                        dev->dd_lu_dev.ld_obd->obd_name,
614                        PFID(lu_object_fid(&dt->do_lu)), rc);
615
616                 GOTO(out_req, rc);
617         }
618
619         invalidated = atomic_read(&obj->opo_invalidate_seq);
620
621         rc = osp_remote_sync(env, osp, update, &req);
622
623         down_read(&obj->opo_invalidate_sem);
624         if (invalidated == atomic_read(&obj->opo_invalidate_seq)) {
625                 /* no invalited has came so far, we can cache the attrs */
626                 cache = 1;
627         }
628
629         if (rc != 0) {
630                 if (rc == -ENOENT) {
631                         osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
632                         if (cache)
633                                 obj->opo_non_exist = 1;
634                 } else {
635                         CERROR("%s: osp_attr_get update error "DFID": rc = %d\n",
636                                dev->dd_lu_dev.ld_obd->obd_name,
637                                PFID(lu_object_fid(&dt->do_lu)), rc);
638                 }
639
640                 GOTO(out, rc);
641         }
642
643         osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
644         obj->opo_non_exist = 0;
645         reply = req_capsule_server_sized_get(&req->rq_pill,
646                                              &RMF_OUT_UPDATE_REPLY,
647                                              OUT_UPDATE_REPLY_SIZE);
648         if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
649                 GOTO(out, rc = -EPROTO);
650
651         rc = osp_get_attr_from_reply(env, reply, req, attr,
652                                      cache ? obj : NULL, 0);
653         if (rc != 0)
654                 GOTO(out, rc);
655
656         spin_lock(&obj->opo_lock);
657         if (cache)
658                 obj->opo_stale = 0;
659         spin_unlock(&obj->opo_lock);
660
661         GOTO(out, rc);
662
663 out:
664         up_read(&obj->opo_invalidate_sem);
665
666 out_req:
667         if (req != NULL)
668                 ptlrpc_req_finished(req);
669
670         osp_update_request_destroy(env, update);
671
672         return rc;
673 }
674
675 /**
676  * Implement OSP layer dt_object_operations::do_declare_attr_set() interface.
677  *
678  * If the transaction is not remote one, then declare the credits that will
679  * be used for the subsequent llog record for the object's attributes.
680  *
681  * \param[in] env       pointer to the thread context
682  * \param[in] dt        pointer to the OSP layer dt_object
683  * \param[in] attr      pointer to the attribute to be set
684  * \param[in] th        pointer to the transaction handler
685  *
686  * \retval              0 for success
687  * \retval              negative error number on failure
688  */
689 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
690                                 const struct lu_attr *attr, struct thandle *th)
691 {
692         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
693         struct osp_object       *o = dt2osp_obj(dt);
694         int                     rc;
695
696         if (is_only_remote_trans(th))
697                 return osp_md_declare_attr_set(env, dt, attr, th);
698         /*
699          * Usually we don't allow server stack to manipulate size
700          * but there is a special case when striping is created
701          * late, after stripeless file got truncated to non-zero.
702          *
703          * In this case we do the following:
704          *
705          * 1) grab id in declare - this can lead to leaked OST objects
706          *    but we don't currently have proper mechanism and the only
707          *    options we have are to do truncate RPC holding transaction
708          *    open (very bad) or to grab id in declare at cost of leaked
709          *    OST object in same very rare unfortunate case (just bad)
710          *    notice 1.6-2.0 do assignment outside of running transaction
711          *    all the time, meaning many more chances for leaked objects.
712          *
713          * 2) send synchronous truncate RPC with just assigned id
714          */
715
716         /* there are few places in MDD code still passing NULL
717          * XXX: to be fixed soon */
718         if (attr == NULL)
719                 RETURN(0);
720
721         if (attr->la_valid & LA_SIZE && attr->la_size > 0 &&
722             fid_is_zero(lu_object_fid(&o->opo_obj.do_lu))) {
723                 LASSERT(!dt_object_exists(dt));
724                 osp_object_assign_fid(env, d, o);
725                 rc = osp_object_truncate(env, dt, attr->la_size);
726                 if (rc != 0)
727                         RETURN(rc);
728         }
729
730         if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
731                 RETURN(0);
732
733         /* track all UID/GID, projid, and layout version changes via llog */
734         rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
735
736         return 0;
737 }
738
739 /**
740  * Implement OSP layer dt_object_operations::do_attr_set() interface.
741  *
742  * Set attribute to the specified OST object.
743  *
744  * If the transaction is a remote one, then add OUT_ATTR_SET sub-request
745  * in the OUT RPC that will be flushed when the remote transaction stop.
746  * Otherwise, it will generate a MDS_SETATTR64_REC record in the llog that
747  * will be handled by a dedicated thread asynchronously.
748  *
749  * If the attribute entry exists in the OSP object attributes cache,
750  * then update the cached attribute according to given attribute.
751  *
752  * \param[in] env       pointer to the thread context
753  * \param[in] dt        pointer to the OSP layer dt_object
754  * \param[in] attr      pointer to the attribute to be set
755  * \param[in] th        pointer to the transaction handler
756  *
757  * \retval              0 for success
758  * \retval              negative error number on failure
759  */
760 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
761                         const struct lu_attr *attr, struct thandle *th)
762 {
763         struct osp_object       *o = dt2osp_obj(dt);
764         int                      rc = 0;
765         ENTRY;
766
767         /* we're interested in uid/gid/projid/layout version changes,
768          * and also specific setting of enc flag
769          */
770         if (!(attr->la_valid & LA_REMOTE_ATTR_SET) &&
771             !(attr->la_valid == LA_FLAGS &&
772               attr->la_flags == LUSTRE_ENCRYPT_FL))
773                 RETURN(0);
774
775         if (!is_only_remote_trans(th)) {
776                 if (attr->la_flags & LUSTRE_SET_SYNC_FL) {
777                         struct ptlrpc_request *req = NULL;
778                         struct osp_update_request *update = NULL;
779                         struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
780
781                         update = osp_update_request_create(&osp->opd_dt_dev);
782                         if (IS_ERR(update))
783                                 RETURN(PTR_ERR(update));
784
785                         rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update,
786                                                  lu_object_fid(&dt->do_lu),
787                                                  attr);
788                         if (rc != 0) {
789                                 CERROR("%s: update error "DFID": rc = %d\n",
790                                        osp->opd_obd->obd_name,
791                                        PFID(lu_object_fid(&dt->do_lu)), rc);
792
793                                 osp_update_request_destroy(env, update);
794                                 RETURN(rc);
795                         }
796
797                         rc = osp_remote_sync(env, osp, update, &req);
798                         if (req != NULL)
799                                 ptlrpc_req_finished(req);
800
801                         osp_update_request_destroy(env, update);
802                 } else {
803                         struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
804
805                         rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
806                         /* send layout version to OST ASAP */
807                         if (attr->la_valid & LA_LAYOUT_VERSION)
808                                 wake_up(&osp->opd_sync_waitq);
809                         /* XXX: send new uid/gid to OST ASAP? */
810                 }
811         } else {
812                 struct lu_attr  *la;
813
814                 /* It is for OST-object attr_set directly without updating
815                  * local MDT-object attribute. It is usually used by LFSCK. */
816                 rc = osp_md_attr_set(env, dt, attr, th);
817                 CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n",
818                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
819
820                 if (rc != 0)
821                         RETURN(rc);
822
823                 /* Update the OSP object attributes cache. */
824                 la = &o->opo_attr;
825                 spin_lock(&o->opo_lock);
826                 if (attr->la_valid & LA_UID) {
827                         la->la_uid = attr->la_uid;
828                         la->la_valid |= LA_UID;
829                 }
830
831                 if (attr->la_valid & LA_GID) {
832                         la->la_gid = attr->la_gid;
833                         la->la_valid |= LA_GID;
834                 }
835                 if (attr->la_valid & LA_PROJID) {
836                         la->la_projid = attr->la_projid;
837                         la->la_valid |= LA_PROJID;
838                 }
839                 spin_unlock(&o->opo_lock);
840         }
841
842         RETURN(rc);
843 }
844
845 /**
846  * Interpreter function for getting OSP object extended attribute asynchronously
847  *
848  * Called to interpret the result of an async mode RPC for getting the
849  * OSP object extended attribute.
850  *
851  * \param[in] env       pointer to the thread context
852  * \param[in] reply     pointer to the RPC reply
853  * \param[in] req       pointer to the RPC request
854  * \param[in] obj       pointer to the OSP object
855  * \param[out] data     pointer to OSP object attributes cache
856  * \param[in] index     the index of the attribute buffer in the reply
857  * \param[in] rc        the result for handling the RPC
858  *
859  * \retval              0 for success
860  * \retval              negative error number on failure
861  */
862 static int osp_xattr_get_interpterer(const struct lu_env *env,
863                                      struct object_update_reply *reply,
864                                      struct ptlrpc_request *req,
865                                      struct osp_object *obj,
866                                      void *data, int index, int rc)
867 {
868         struct osp_xattr_entry *oxe = data;
869
870         spin_lock(&obj->opo_lock);
871         if (rc >= 0) {
872                 struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
873
874                 rc = object_update_result_data_get(reply, rbuf, index);
875                 if (rc == -ENOENT || rc == -ENODATA || rc == 0) {
876                         oxe->oxe_exist = 0;
877                         oxe->oxe_ready = 1;
878                         goto unlock;
879                 }
880
881                 if (unlikely(rc < 0) || !oxe_can_hold(oxe, rbuf->lb_len)) {
882                         oxe->oxe_ready = 0;
883                         goto unlock;
884                 }
885
886                 __osp_oac_xattr_assignment(obj, oxe, rbuf);
887         } else if (rc == -ENOENT || rc == -ENODATA) {
888                 oxe->oxe_exist = 0;
889                 oxe->oxe_ready = 1;
890         } else {
891                 oxe->oxe_ready = 0;
892         }
893
894 unlock:
895         spin_unlock(&obj->opo_lock);
896
897         /* Put the reference obtained in the osp_declare_xattr_get(). */
898         osp_oac_xattr_put(oxe);
899
900         return 0;
901 }
902
903 /**
904  * Implement OSP dt_object_operations::do_declare_xattr_get() interface.
905  *
906  * Declare that the caller will get extended attribute from the specified
907  * OST object.
908  *
909  * This function will add an OUT_XATTR_GET sub-request to the per OSP
910  * based shared asynchronous request queue with the interpreter function:
911  * osp_xattr_get_interpterer().
912  *
913  * \param[in] env       pointer to the thread context
914  * \param[in] dt        pointer to the OSP layer dt_object
915  * \param[out] buf      pointer to the lu_buf to hold the extended attribute
916  * \param[in] name      the name for the expected extended attribute
917  *
918  * \retval              0 for success
919  * \retval              negative error number on failure
920  */
921 static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
922                                  struct lu_buf *buf, const char *name)
923 {
924         struct osp_object       *obj     = dt2osp_obj(dt);
925         struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
926         struct osp_xattr_entry  *oxe;
927         int                      rc      = 0;
928         __u16 len;
929
930         LASSERT(buf != NULL);
931         LASSERT(name != NULL);
932
933         if (unlikely(buf->lb_len == 0))
934                 return -EINVAL;
935
936         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
937         if (oxe == NULL)
938                 return -ENOMEM;
939
940         len = strlen(name) + 1;
941         mutex_lock(&osp->opd_async_requests_mutex);
942         rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
943                                       &len, (const void **)&name,
944                                       oxe, buf->lb_len,
945                                       osp_xattr_get_interpterer);
946         if (rc != 0) {
947                 mutex_unlock(&osp->opd_async_requests_mutex);
948                 osp_oac_xattr_put(oxe);
949         } else {
950                 struct osp_update_request *our;
951                 struct osp_update_request_sub *ours;
952
953                 /* XXX: Currently, we trigger the batched async OUT
954                  *      RPC via dt_declare_xattr_get(). It is not
955                  *      perfect solution, but works well now.
956                  *
957                  *      We will improve it in the future. */
958                 our = osp->opd_async_requests;
959                 ours = osp_current_object_update_request(our);
960                 if (ours != NULL && ours->ours_req != NULL &&
961                     ours->ours_req->ourq_count > 0) {
962                         osp->opd_async_requests = NULL;
963                         mutex_unlock(&osp->opd_async_requests_mutex);
964                         rc = osp_unplug_async_request(env, osp, our);
965                 } else {
966                         mutex_unlock(&osp->opd_async_requests_mutex);
967                 }
968         }
969
970         return rc;
971 }
972
973 /**
974  * Implement OSP layer dt_object_operations::do_xattr_get() interface.
975  *
976  * Get extended attribute from the specified MDT/OST object.
977  *
978  * If the extended attribute is in the OSP object attributes cache, then
979  * return the cached extended attribute directly. Otherwise it will get
980  * the extended attribute synchronously, if successful, add it to the OSP
981  * attributes cache. (\see lustre/osp/osp_trans.c for OUT RPC.)
982  *
983  * There is a race condition: some other thread has added the named extended
984  * attributed entry to the OSP object attributes cache during the current
985  * OUT_XATTR_GET handling. If such case happens, the OSP will replace the
986  * (just) existing extended attribute entry with the new replied one.
987  *
988  * \param[in] env       pointer to the thread context
989  * \param[in] dt        pointer to the OSP layer dt_object
990  * \param[out] buf      pointer to the lu_buf to hold the extended attribute
991  * \param[in] name      the name for the expected extended attribute
992  *
993  * \retval              0 for success
994  * \retval              negative error number on failure
995  */
996 int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
997                   struct lu_buf *buf, const char *name)
998 {
999         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
1000         struct osp_object       *obj    = dt2osp_obj(dt);
1001         struct dt_device        *dev    = &osp->opd_dt_dev;
1002         struct lu_buf           *rbuf   = &osp_env_info(env)->osi_lb2;
1003         struct osp_update_request *update = NULL;
1004         struct ptlrpc_request   *req    = NULL;
1005         struct object_update_reply *reply;
1006         struct osp_xattr_entry  *oxe    = NULL;
1007         const char *dname = osp_dto2name(obj);
1008         int invalidated, rc = 0;
1009         ENTRY;
1010
1011         LASSERT(buf != NULL);
1012         LASSERT(name != NULL);
1013
1014         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
1015             osp->opd_index == cfs_fail_val) {
1016                 if (is_ost_obj(&dt->do_lu)) {
1017                         if (osp_dev2node(osp) == cfs_fail_val)
1018                                 RETURN(-ENOTCONN);
1019                 } else {
1020                         if (strcmp(name, XATTR_NAME_LINK) == 0)
1021                                 RETURN(-ENOTCONN);
1022                 }
1023         }
1024
1025         if (unlikely(obj->opo_non_exist))
1026                 RETURN(-ENOENT);
1027
1028         invalidated = atomic_read(&obj->opo_invalidate_seq);
1029
1030         oxe = osp_oac_xattr_find(obj, name, false);
1031         if (oxe != NULL) {
1032                 spin_lock(&obj->opo_lock);
1033                 if (oxe->oxe_ready) {
1034                         if (!oxe->oxe_exist)
1035                                 GOTO(unlock, rc = -ENODATA);
1036
1037                         if (buf->lb_buf == NULL)
1038                                 GOTO(unlock, rc = oxe->oxe_vallen);
1039
1040                         if (buf->lb_len < oxe->oxe_vallen)
1041                                 GOTO(unlock, rc = -ERANGE);
1042
1043                         memcpy(buf->lb_buf, oxe->oxe_value,
1044                                oxe->oxe_vallen);
1045
1046                         GOTO(unlock, rc = oxe->oxe_vallen);
1047
1048 unlock:
1049                         spin_unlock(&obj->opo_lock);
1050                         osp_oac_xattr_put(oxe);
1051
1052                         return rc;
1053                 }
1054                 spin_unlock(&obj->opo_lock);
1055         }
1056         update = osp_update_request_create(dev);
1057         if (IS_ERR(update))
1058                 GOTO(out_req, rc = PTR_ERR(update));
1059
1060         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_get_pack, update,
1061                                  lu_object_fid(&dt->do_lu), name, buf->lb_len);
1062         if (rc != 0) {
1063                 CERROR("%s: Insert update error "DFID": rc = %d\n",
1064                        dname, PFID(lu_object_fid(&dt->do_lu)), rc);
1065                 GOTO(out_req, rc);
1066         }
1067
1068         rc = osp_remote_sync(env, osp, update, &req);
1069
1070         down_read(&obj->opo_invalidate_sem);
1071         if (invalidated != atomic_read(&obj->opo_invalidate_seq)) {
1072                 /* invalidated has been requested, we can't cache the result */
1073                 if (rc < 0) {
1074                         if (rc == -ENOENT)
1075                                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
1076                         GOTO(out, rc);
1077                 }
1078                 reply = req_capsule_server_sized_get(&req->rq_pill,
1079                                                      &RMF_OUT_UPDATE_REPLY,
1080                                 OUT_UPDATE_REPLY_SIZE);
1081                 if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
1082                         CERROR("%s: Wrong version %x expected %x "DFID
1083                                ": rc = %d\n", dname, reply->ourp_magic,
1084                                UPDATE_REPLY_MAGIC,
1085                                PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
1086                         GOTO(out, rc = -EPROTO);
1087                 }
1088                 rc = object_update_result_data_get(reply, rbuf, 0);
1089                 GOTO(out, rc);
1090         }
1091
1092         if (rc < 0) {
1093                 if (rc == -ENOENT) {
1094                         dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
1095                         obj->opo_non_exist = 1;
1096                 }
1097
1098                 if (oxe == NULL)
1099                         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
1100
1101                 if (oxe == NULL) {
1102                         CWARN("%s: Fail to add xattr (%s) to cache for "
1103                               DFID" (1): rc = %d\n", dname, name,
1104                               PFID(lu_object_fid(&dt->do_lu)), rc);
1105
1106                         GOTO(out, rc);
1107                 }
1108
1109                 spin_lock(&obj->opo_lock);
1110                 if (rc == -ENOENT || rc == -ENODATA) {
1111                         oxe->oxe_exist = 0;
1112                         oxe->oxe_ready = 1;
1113                 } else {
1114                         oxe->oxe_ready = 0;
1115                 }
1116                 spin_unlock(&obj->opo_lock);
1117
1118                 GOTO(out, rc);
1119         }
1120
1121         reply = req_capsule_server_sized_get(&req->rq_pill,
1122                                              &RMF_OUT_UPDATE_REPLY,
1123                                              OUT_UPDATE_REPLY_SIZE);
1124         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
1125                 CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
1126                        dname, reply->ourp_magic, UPDATE_REPLY_MAGIC,
1127                        PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
1128
1129                 GOTO(out, rc = -EPROTO);
1130         }
1131
1132         rc = object_update_result_data_get(reply, rbuf, 0);
1133         if (rc < 0 || rbuf->lb_len == 0) {
1134                 if (oxe == NULL && rc == -ENODATA) {
1135                         oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
1136                         if (oxe == NULL) {
1137                                 rc = -ENOMEM;
1138                                 CWARN("%s: Fail to add xattr (%s) to cache for "
1139                                       DFID" (1): rc = %d\n", dname, name,
1140                                       PFID(lu_object_fid(&dt->do_lu)), rc);
1141                                 GOTO(out, rc);
1142                         }
1143                 }
1144
1145                 if (oxe) {
1146                         spin_lock(&obj->opo_lock);
1147                         if (unlikely(rc == -ENODATA)) {
1148                                 oxe->oxe_exist = 0;
1149                                 oxe->oxe_ready = 1;
1150                         } else {
1151                                 oxe->oxe_ready = 0;
1152                         }
1153                         spin_unlock(&obj->opo_lock);
1154                 }
1155
1156                 GOTO(out, rc);
1157         }
1158
1159         /* For detecting EA size. */
1160         if (!buf->lb_buf)
1161                 GOTO(out, rc);
1162
1163         if (!oxe) {
1164                 oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
1165                 if (!oxe) {
1166                         CWARN("%s: Fail to add xattr (%s) to "
1167                               "cache for "DFID" (2): rc = %d\n",
1168                               dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
1169
1170                         GOTO(out, rc);
1171                 }
1172         }
1173
1174         oxe = osp_oac_xattr_assignment(obj, oxe, rbuf);
1175
1176         GOTO(out, rc);
1177
1178 out:
1179         up_read(&obj->opo_invalidate_sem);
1180
1181 out_req:
1182         if (rc > 0 && buf->lb_buf) {
1183                 if (unlikely(buf->lb_len < rbuf->lb_len))
1184                         rc = -ERANGE;
1185                 else
1186                         memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
1187         }
1188
1189         if (req)
1190                 ptlrpc_req_finished(req);
1191
1192         if (update && !IS_ERR(update))
1193                 osp_update_request_destroy(env, update);
1194
1195         if (oxe)
1196                 osp_oac_xattr_put(oxe);
1197
1198         return rc;
1199 }
1200
1201 /**
1202  * Implement OSP layer dt_object_operations::do_declare_xattr_set() interface.
1203  *
1204  * Declare that the caller will set extended attribute to the specified
1205  * MDT/OST object.
1206  *
1207  * If it is non-remote transaction, it will add an OUT_XATTR_SET sub-request
1208  * to the OUT RPC that will be flushed when the transaction start. And if the
1209  * OSP attributes cache is initialized, then check whether the name extended
1210  * attribute entry exists in the cache or not. If yes, replace it; otherwise,
1211  * add the extended attribute to the cache.
1212  *
1213  * \param[in] env       pointer to the thread context
1214  * \param[in] dt        pointer to the OSP layer dt_object
1215  * \param[in] buf       pointer to the lu_buf to hold the extended attribute
1216  * \param[in] name      the name of the extended attribute to be set
1217  * \param[in] flag      to indicate the detailed set operation: LU_XATTR_CREATE
1218  *                      or LU_XATTR_REPLACE or others
1219  * \param[in] th        pointer to the transaction handler
1220  *
1221  * \retval              0 for success
1222  * \retval              negative error number on failure
1223  */
1224 int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
1225                           const struct lu_buf *buf, const char *name,
1226                           int flag, struct thandle *th)
1227 {
1228         return osp_trans_update_request_create(th);
1229 }
1230
1231 /**
1232  * Implement OSP layer dt_object_operations::do_xattr_set() interface.
1233  *
1234  * Set extended attribute to the specified MDT/OST object.
1235  *
1236  * Add an OUT_XATTR_SET sub-request into the OUT RPC that will be flushed in
1237  * the transaction stop. And if the OSP attributes cache is initialized, then
1238  * check whether the name extended attribute entry exists in the cache or not.
1239  * If yes, replace it; otherwise, add the extended attribute to the cache.
1240  *
1241  * \param[in] env       pointer to the thread context
1242  * \param[in] dt        pointer to the OSP layer dt_object
1243  * \param[in] buf       pointer to the lu_buf to hold the extended attribute
1244  * \param[in] name      the name of the extended attribute to be set
1245  * \param[in] fl        to indicate the detailed set operation: LU_XATTR_CREATE
1246  *                      or LU_XATTR_REPLACE or others
1247  * \param[in] th        pointer to the transaction handler
1248  *
1249  * \retval              0 for success
1250  * \retval              negative error number on failure
1251  */
1252 int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
1253                   const struct lu_buf *buf, const char *name, int fl,
1254                   struct thandle *th)
1255 {
1256         struct osp_object *o = dt2osp_obj(dt);
1257         struct osp_update_request *update;
1258         struct osp_xattr_entry *oxe;
1259         int rc;
1260         ENTRY;
1261
1262         update = thandle_to_osp_update_request(th);
1263         LASSERT(update != NULL);
1264
1265         CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
1266                PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
1267
1268         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_set_pack, update,
1269                                  lu_object_fid(&dt->do_lu), buf, name, fl);
1270         if (rc != 0)
1271                 RETURN(rc);
1272
1273         /* Do not cache linkEA that may be self-adjusted by peers
1274          * under EA overflow case. */
1275         if (strcmp(name, XATTR_NAME_LINK) == 0) {
1276                 oxe = osp_oac_xattr_find(o, name, true);
1277                 if (oxe != NULL)
1278                         osp_oac_xattr_put(oxe);
1279
1280                 RETURN(0);
1281         }
1282
1283         oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
1284         if (oxe == NULL) {
1285                 CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
1286                       osp_dto2name(o), name, PFID(lu_object_fid(&dt->do_lu)));
1287
1288                 RETURN(0);
1289         }
1290
1291         oxe = osp_oac_xattr_assignment(o, oxe, buf);
1292         if (oxe)
1293                 osp_oac_xattr_put(oxe);
1294
1295         RETURN(0);
1296 }
1297
1298 /**
1299  * Implement OSP layer dt_object_operations::do_declare_xattr_del() interface.
1300  *
1301  * Declare that the caller will delete extended attribute on the specified
1302  * MDT/OST object.
1303  *
1304  * If it is non-remote transaction, it will add an OUT_XATTR_DEL sub-request
1305  * to the OUT RPC that will be flushed when the transaction start. And if the
1306  * name extended attribute entry exists in the OSP attributes cache, then remove
1307  * it from the cache.
1308  *
1309  * \param[in] env       pointer to the thread context
1310  * \param[in] dt        pointer to the OSP layer dt_object
1311  * \param[in] name      the name of the extended attribute to be set
1312  * \param[in] th        pointer to the transaction handler
1313  *
1314  * \retval              0 for success
1315  * \retval              negative error number on failure
1316  */
1317 int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
1318                           const char *name, struct thandle *th)
1319 {
1320         return osp_trans_update_request_create(th);
1321 }
1322
1323 /**
1324  * Implement OSP layer dt_object_operations::do_xattr_del() interface.
1325  *
1326  * Delete extended attribute on the specified MDT/OST object.
1327  *
1328  * If it is remote transaction, it will add an OUT_XATTR_DEL sub-request into
1329  * the OUT RPC that will be flushed when the transaction stop. And if the name
1330  * extended attribute entry exists in the OSP attributes cache, then remove it
1331  * from the cache.
1332  *
1333  * \param[in] env       pointer to the thread context
1334  * \param[in] dt        pointer to the OSP layer dt_object
1335  * \param[in] name      the name of the extended attribute to be set
1336  * \param[in] th        pointer to the transaction handler
1337  *
1338  * \retval              0 for success
1339  * \retval              negative error number on failure
1340  */
1341 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
1342                   const char *name, struct thandle *th)
1343 {
1344         struct osp_update_request *update;
1345         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1346         struct osp_object *o = dt2osp_obj(dt);
1347         struct osp_xattr_entry *oxe;
1348         int rc;
1349
1350         update = thandle_to_osp_update_request(th);
1351         LASSERT(update != NULL);
1352
1353         rc = OSP_UPDATE_RPC_PACK(env, out_xattr_del_pack, update, fid, name);
1354         if (rc != 0)
1355                 return rc;
1356
1357         oxe = osp_oac_xattr_find(o, name, true);
1358         if (oxe != NULL)
1359                 /* Drop the ref for entry on list. */
1360                 osp_oac_xattr_put(oxe);
1361
1362         return 0;
1363 }
1364
1365 void osp_obj_invalidate_cache(struct osp_object *obj)
1366 {
1367         struct osp_xattr_entry *oxe;
1368         struct osp_xattr_entry *tmp;
1369
1370         spin_lock(&obj->opo_lock);
1371         list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
1372                 oxe->oxe_ready = 0;
1373                 list_del_init(&oxe->oxe_list);
1374                 osp_oac_xattr_put(oxe);
1375         }
1376         obj->opo_attr.la_valid = 0;
1377         spin_unlock(&obj->opo_lock);
1378 }
1379
1380 /**
1381  * Implement OSP layer dt_object_operations::do_invalidate() interface.
1382  *
1383  * Invalidate attributes cached on the specified MDT/OST object.
1384  *
1385  * \param[in] env       pointer to the thread context
1386  * \param[in] dt        pointer to the OSP layer dt_object
1387  *
1388  * \retval              0 for success
1389  * \retval              negative error number on failure
1390  */
1391 int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
1392 {
1393         struct osp_object *obj = dt2osp_obj(dt);
1394         ENTRY;
1395
1396         CDEBUG(D_HA, "Invalidate osp_object "DFID"\n",
1397                PFID(lu_object_fid(&dt->do_lu)));
1398
1399         /* serialize attr/EA set vs. invalidation */
1400         down_write(&obj->opo_invalidate_sem);
1401
1402         /* this should invalidate all in-flights */
1403         atomic_inc(&obj->opo_invalidate_seq);
1404
1405         spin_lock(&obj->opo_lock);
1406         /* do not mark new objects stale */
1407         if (obj->opo_attr.la_valid)
1408                 obj->opo_stale = 1;
1409         obj->opo_non_exist = 0;
1410         spin_unlock(&obj->opo_lock);
1411
1412         osp_obj_invalidate_cache(obj);
1413
1414         up_write(&obj->opo_invalidate_sem);
1415
1416         RETURN(0);
1417 }
1418
1419 bool osp_check_stale(struct dt_object *dt)
1420 {
1421         struct osp_object *obj = dt2osp_obj(dt);
1422
1423         if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
1424                 return true;
1425
1426         return obj->opo_stale;
1427 }
1428
1429
1430 /**
1431  * Implement OSP layer dt_object_operations::do_declare_create() interface.
1432  *
1433  * Declare that the caller will create the OST object.
1434  *
1435  * If the transaction is a remote transaction and the FID for the OST-object
1436  * has been assigned already, then handle it as creating (remote) MDT object
1437  * via osp_md_declare_create(). This function is usually used for LFSCK
1438  * to re-create the lost OST object. Otherwise, if it is not replay case, the
1439  * OSP will reserve pre-created object for the subsequent create operation;
1440  * if the MDT side cached pre-created objects are less than some threshold,
1441  * then it will wakeup the pre-create thread.
1442  *
1443  * \param[in] env       pointer to the thread context
1444  * \param[in] dt        pointer to the OSP layer dt_object
1445  * \param[in] attr      the attribute for the object to be created
1446  * \param[in] hint      pointer to the hint for creating the object, such as
1447  *                      the parent object
1448  * \param[in] dof       pointer to the dt_object_format for help the creation
1449  * \param[in] th        pointer to the transaction handler
1450  *
1451  * \retval              0 for success
1452  * \retval              negative error number on failure
1453  */
1454 static int osp_declare_create(const struct lu_env *env, struct dt_object *dt,
1455                               struct lu_attr *attr,
1456                               struct dt_allocation_hint *hint,
1457                               struct dt_object_format *dof, struct thandle *th)
1458 {
1459         struct osp_thread_info  *osi = osp_env_info(env);
1460         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
1461         struct osp_object       *o = dt2osp_obj(dt);
1462         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1463         struct thandle          *local_th;
1464         int                      rc = 0;
1465
1466         ENTRY;
1467
1468         if (is_only_remote_trans(th) && !fid_is_zero(fid)) {
1469                 LASSERT(fid_is_sane(fid));
1470
1471                 rc = osp_md_declare_create(env, dt, attr, hint, dof, th);
1472
1473                 RETURN(rc);
1474         }
1475
1476         /* should happen to non-0 OSP only so that at least one object
1477          * has been already declared in the scenario and LOD should
1478          * cleanup that */
1479         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
1480                 RETURN(-ENOSPC);
1481
1482         LASSERT(d->opd_last_used_oid_file);
1483
1484         /*
1485          * There can be gaps in precreated ids and record to unlink llog
1486          * XXX: we do not handle gaps yet, implemented before solution
1487          *      was found to be racy, so we disabled that. there is no
1488          *      point in making useless but expensive llog declaration.
1489          */
1490         /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
1491
1492         local_th = osp_get_storage_thandle(env, th, d);
1493         if (IS_ERR(local_th))
1494                 RETURN(PTR_ERR(local_th));
1495
1496         if (unlikely(!fid_is_zero(fid))) {
1497                 /* replay case: caller knows fid */
1498                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
1499                                    d->opd_index);
1500                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1501                                              &osi->osi_lb, osi->osi_off,
1502                                              local_th);
1503                 RETURN(rc);
1504         }
1505
1506         /*
1507          * in declaration we need to reserve object so that we don't block
1508          * awaiting precreation RPC to complete
1509          */
1510         rc = osp_precreate_reserve(env, d, !hint || hint->dah_can_block);
1511         /*
1512          * we also need to declare update to local "last used id" file for
1513          * recovery if object isn't used for a reason, we need to release
1514          * reservation, this can be made in osd_object_release()
1515          */
1516         if (rc == 0) {
1517                 /* mark id is reserved: in create we don't want to talk
1518                  * to OST */
1519                 LASSERT(o->opo_reserved == 0);
1520                 o->opo_reserved = 1;
1521
1522                 /* common for all OSPs file hystorically */
1523                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
1524                                    d->opd_index);
1525                 rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
1526                                              &osi->osi_lb, osi->osi_off,
1527                                              local_th);
1528         } else {
1529                 /* not needed in the cache anymore */
1530                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1531                             &dt->do_lu.lo_header->loh_flags);
1532         }
1533         RETURN(rc);
1534 }
1535
1536 /**
1537  * Implement OSP layer dt_object_operations::do_create() interface.
1538  *
1539  * Create the OST object.
1540  *
1541  * If the transaction is a remote transaction and the FID for the OST-object
1542  * has been assigned already, then handle it as handling MDT object via the
1543  * osp_md_create(). For other cases, the OSP will assign FID to the
1544  * object to be created, and update last_used Object ID (OID) file.
1545  *
1546  * \param[in] env       pointer to the thread context
1547  * \param[in] dt        pointer to the OSP layer dt_object
1548  * \param[in] attr      the attribute for the object to be created
1549  * \param[in] hint      pointer to the hint for creating the object, such as
1550  *                      the parent object
1551  * \param[in] dof       pointer to the dt_object_format for help the creation
1552  * \param[in] th        pointer to the transaction handler
1553  *
1554  * \retval              0 for success
1555  * \retval              negative error number on failure
1556  */
1557 static int osp_create(const struct lu_env *env, struct dt_object *dt,
1558                       struct lu_attr *attr, struct dt_allocation_hint *hint,
1559                       struct dt_object_format *dof, struct thandle *th)
1560 {
1561         struct osp_thread_info  *osi = osp_env_info(env);
1562         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
1563         struct osp_object       *o = dt2osp_obj(dt);
1564         int                     rc = 0;
1565         struct lu_fid           *fid = &osi->osi_fid;
1566         struct thandle          *local_th;
1567         ENTRY;
1568
1569         if (is_only_remote_trans(th) &&
1570             !fid_is_zero(lu_object_fid(&dt->do_lu))) {
1571                 LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu)));
1572
1573                 rc = osp_md_create(env, dt, attr, hint, dof, th);
1574                 if (rc == 0)
1575                         o->opo_non_exist = 0;
1576
1577                 RETURN(rc);
1578         }
1579
1580         o->opo_non_exist = 0;
1581         if (o->opo_reserved) {
1582                 /* regular case, fid is assigned holding transaction open */
1583                  osp_object_assign_fid(env, d, o);
1584         }
1585
1586         memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
1587
1588         LASSERTF(fid_is_sane(fid), "fid for osp_object %p is insane"DFID"!\n",
1589                  o, PFID(fid));
1590
1591         if (!o->opo_reserved) {
1592                 /* special case, id was assigned outside of transaction
1593                  * see comments in osp_declare_attr_set */
1594                 LASSERT(d->opd_pre != NULL);
1595                 spin_lock(&d->opd_pre_lock);
1596                 osp_update_last_fid(d, fid);
1597                 spin_unlock(&d->opd_pre_lock);
1598         }
1599
1600         CDEBUG(D_INODE, "fid for osp_object %p is "DFID"\n", o, PFID(fid));
1601
1602         /* If the precreate ends, it means it will be ready to rollover to
1603          * the new sequence soon, all the creation should be synchronized,
1604          * otherwise during replay, the replay fid will be inconsistent with
1605          * last_used/create fid */
1606         if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
1607                 th->th_sync = 1;
1608
1609         local_th = osp_get_storage_thandle(env, th, d);
1610         if (IS_ERR(local_th))
1611                 RETURN(PTR_ERR(local_th));
1612         /*
1613          * it's OK if the import is inactive by this moment - id was created
1614          * by OST earlier, we just need to maintain it consistently on the disk
1615          * once import is reconnected, OSP will claim this and other objects
1616          * used and OST either keep them, if they exist or recreate
1617          */
1618
1619         /* we might have lost precreated objects */
1620         if (unlikely(d->opd_gap_count) > 0) {
1621                 LASSERT(d->opd_pre != NULL);
1622                 spin_lock(&d->opd_pre_lock);
1623                 if (d->opd_gap_count > 0) {
1624                         int count = d->opd_gap_count;
1625
1626                         rc = ostid_set_id(&osi->osi_oi,
1627                                           fid_oid(&d->opd_gap_start_fid));
1628                         if (rc) {
1629                                 spin_unlock(&d->opd_pre_lock);
1630                                 RETURN(rc);
1631                         }
1632                         d->opd_gap_count = 0;
1633                         spin_unlock(&d->opd_pre_lock);
1634
1635                         CDEBUG(D_HA, "Writing gap "DFID"+%d in llog\n",
1636                                PFID(&d->opd_gap_start_fid), count);
1637                         /* real gap handling is disabled intil ORI-692 will be
1638                          * fixed, now we only report gaps */
1639                 } else {
1640                         spin_unlock(&d->opd_pre_lock);
1641                 }
1642         }
1643
1644         /* Only need update last_used oid file, seq file will only be update
1645          * during seq rollover */
1646         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
1647                            &d->opd_last_id, d->opd_index);
1648
1649         rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
1650                              &osi->osi_off, local_th);
1651
1652         CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n",
1653                d->opd_obd->obd_name, PFID(fid), d->opd_index, rc);
1654
1655         RETURN(rc);
1656 }
1657
1658 /**
1659  * Implement OSP layer dt_object_operations::do_declare_destroy() interface.
1660  *
1661  * Declare that the caller will destroy the specified OST object.
1662  *
1663  * The OST object destroy will be handled via llog asynchronously. This
1664  * function will declare the credits for generating MDS_UNLINK64_REC llog.
1665  *
1666  * \param[in] env       pointer to the thread context
1667  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
1668  * \param[in] th        pointer to the transaction handler
1669  *
1670  * \retval              0 for success
1671  * \retval              negative error number on failure
1672  */
1673 int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt,
1674                         struct thandle *th)
1675 {
1676         struct osp_object       *o = dt2osp_obj(dt);
1677         struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
1678         int                      rc = 0;
1679
1680         ENTRY;
1681
1682         LASSERT(!osp->opd_connect_mdt);
1683
1684         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
1685                 rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1686
1687         RETURN(rc);
1688 }
1689
1690 /**
1691  * Implement OSP layer dt_object_operations::do_destroy() interface.
1692  *
1693  * Destroy the specified OST object.
1694  *
1695  * The OSP generates a MDS_UNLINK64_REC record in the llog. There
1696  * will be some dedicated thread to handle the llog asynchronously.
1697  *
1698  * It also marks the object as non-cached.
1699  *
1700  * \param[in] env       pointer to the thread context
1701  * \param[in] dt        pointer to the OSP layer dt_object to be destroyed
1702  * \param[in] th        pointer to the transaction handler
1703  *
1704  * \retval              0 for success
1705  * \retval              negative error number on failure
1706  */
1707 static int osp_destroy(const struct lu_env *env, struct dt_object *dt,
1708                        struct thandle *th)
1709 {
1710         struct osp_object       *o = dt2osp_obj(dt);
1711         struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
1712         int                      rc = 0;
1713
1714         ENTRY;
1715
1716         o->opo_non_exist = 1;
1717
1718         LASSERT(!osp->opd_connect_mdt);
1719
1720         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) {
1721                 /* once transaction is committed put proper command on
1722                  * the queue going to our OST. */
1723                 rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1724                 if (rc < 0)
1725                         RETURN(rc);
1726         }
1727
1728         /* not needed in cache any more */
1729         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1730
1731         RETURN(rc);
1732 }
1733
1734 static int osp_orphan_index_lookup(const struct lu_env *env,
1735                                    struct dt_object *dt,
1736                                    struct dt_rec *rec,
1737                                    const struct dt_key *key)
1738 {
1739         return -EOPNOTSUPP;
1740 }
1741
1742 static int osp_orphan_index_declare_insert(const struct lu_env *env,
1743                                            struct dt_object *dt,
1744                                            const struct dt_rec *rec,
1745                                            const struct dt_key *key,
1746                                            struct thandle *handle)
1747 {
1748         return -EOPNOTSUPP;
1749 }
1750
1751 static int osp_orphan_index_insert(const struct lu_env *env,
1752                                    struct dt_object *dt,
1753                                    const struct dt_rec *rec,
1754                                    const struct dt_key *key,
1755                                    struct thandle *handle)
1756 {
1757         return -EOPNOTSUPP;
1758 }
1759
1760 static int osp_orphan_index_declare_delete(const struct lu_env *env,
1761                                            struct dt_object *dt,
1762                                            const struct dt_key *key,
1763                                            struct thandle *handle)
1764 {
1765         return -EOPNOTSUPP;
1766 }
1767
1768 static int osp_orphan_index_delete(const struct lu_env *env,
1769                                    struct dt_object *dt,
1770                                    const struct dt_key *key,
1771                                    struct thandle *handle)
1772 {
1773         return -EOPNOTSUPP;
1774 }
1775
1776 /**
1777  * Initialize the OSP layer index iteration.
1778  *
1779  * \param[in] env       pointer to the thread context
1780  * \param[in] dt        pointer to the index object to be iterated
1781  * \param[in] attr      unused
1782  *
1783  * \retval              pointer to the iteration structure
1784  * \retval              negative error number on failure
1785  */
1786 struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
1787                           __u32 attr)
1788 {
1789         struct osp_it *it;
1790
1791         OBD_ALLOC_PTR(it);
1792         if (it == NULL)
1793                 return ERR_PTR(-ENOMEM);
1794
1795         it->ooi_pos_ent = -1;
1796         it->ooi_obj = dt;
1797         it->ooi_attr = attr;
1798
1799         return (struct dt_it *)it;
1800 }
1801
1802 /**
1803  * Finalize the OSP layer index iteration.
1804  *
1805  * \param[in] env       pointer to the thread context
1806  * \param[in] di        pointer to the iteration structure
1807  */
1808 void osp_it_fini(const struct lu_env *env, struct dt_it *di)
1809 {
1810         struct osp_it   *it = (struct osp_it *)di;
1811         struct page     **pages = it->ooi_pages;
1812         int             npages = it->ooi_total_npages;
1813         int             i;
1814
1815         if (pages != NULL) {
1816                 for (i = 0; i < npages; i++) {
1817                         if (pages[i] != NULL) {
1818                                 if (pages[i] == it->ooi_cur_page) {
1819                                         kunmap(pages[i]);
1820                                         it->ooi_cur_page = NULL;
1821                                 }
1822                                 __free_page(pages[i]);
1823                         }
1824                 }
1825                 OBD_FREE_PTR_ARRAY(pages, npages);
1826         }
1827         OBD_FREE_PTR(it);
1828 }
1829
1830 /**
1831  * Get more records for the iteration from peer.
1832  *
1833  * The new records will be filled in an array of pages. The OSP side
1834  * allows 1MB bulk data to be transferred.
1835  *
1836  * \param[in] env       pointer to the thread context
1837  * \param[in] it        pointer to the iteration structure
1838  *
1839  * \retval              0 for success
1840  * \retval              negative error number on failure
1841  */
1842 static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
1843 {
1844         struct lu_device         *dev   = it->ooi_obj->do_lu.lo_dev;
1845         struct osp_device        *osp   = lu2osp_dev(dev);
1846         struct page             **pages;
1847         struct ptlrpc_request    *req   = NULL;
1848         struct ptlrpc_bulk_desc  *desc;
1849         struct idx_info          *ii;
1850         int                       npages;
1851         int                       rc;
1852         int                       i;
1853         ENTRY;
1854
1855         /* 1MB bulk */
1856         npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20);
1857         npages /= PAGE_SIZE;
1858
1859         OBD_ALLOC_PTR_ARRAY(pages, npages);
1860         if (pages == NULL)
1861                 RETURN(-ENOMEM);
1862
1863         it->ooi_pages = pages;
1864         it->ooi_total_npages = npages;
1865         for (i = 0; i < npages; i++) {
1866                 pages[i] = alloc_page(GFP_NOFS);
1867                 if (pages[i] == NULL)
1868                         RETURN(-ENOMEM);
1869         }
1870
1871         req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import,
1872                                    &RQF_OBD_IDX_READ);
1873         if (req == NULL)
1874                 RETURN(-ENOMEM);
1875
1876         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
1877         if (rc != 0) {
1878                 ptlrpc_request_free(req);
1879                 RETURN(rc);
1880         }
1881
1882         osp_set_req_replay(osp, req);
1883         req->rq_request_portal = OUT_PORTAL;
1884         ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
1885         memset(ii, 0, sizeof(*ii));
1886         if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) {
1887                 /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE,
1888                  * ost_index, 0] with LAST_ID FID, so it needs to replace
1889                  * the FID with orphan FID here */
1890                 ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE;
1891                 ii->ii_fid.f_oid = osp->opd_index;
1892                 ii->ii_fid.f_ver = 0;
1893                 ii->ii_flags = II_FL_NOHASH;
1894                 ii->ii_attrs = osp_dev2node(osp);
1895         } else {
1896                 ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu);
1897                 ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY |
1898                                II_FL_VARREC;
1899                 ii->ii_attrs = it->ooi_attr;
1900         }
1901         ii->ii_magic = IDX_INFO_MAGIC;
1902         ii->ii_count = npages * LU_PAGE_COUNT;
1903         ii->ii_hash_start = it->ooi_next;
1904
1905         ptlrpc_at_set_req_timeout(req);
1906
1907         desc = ptlrpc_prep_bulk_imp(req, npages, 1,
1908                                     PTLRPC_BULK_PUT_SINK,
1909                                     MDS_BULK_PORTAL,
1910                                     &ptlrpc_bulk_kiov_pin_ops);
1911         if (desc == NULL)
1912                 GOTO(out, rc = -ENOMEM);
1913
1914         for (i = 0; i < npages; i++)
1915                 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
1916                                                  PAGE_SIZE);
1917
1918         ptlrpc_request_set_replen(req);
1919         rc = ptlrpc_queue_wait(req);
1920         if (rc != 0)
1921                 GOTO(out, rc);
1922
1923         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1924                                           req->rq_bulk->bd_nob_transferred);
1925         if (rc < 0)
1926                 GOTO(out, rc);
1927         rc = 0;
1928
1929         ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
1930         if (ii->ii_magic != IDX_INFO_MAGIC)
1931                  GOTO(out, rc = -EPROTO);
1932
1933         npages = (ii->ii_count + LU_PAGE_COUNT - 1) >>
1934                  (PAGE_SHIFT - LU_PAGE_SHIFT);
1935         if (npages > it->ooi_total_npages) {
1936                 CERROR("%s: returned more pages than expected, %u > %u\n",
1937                        osp->opd_obd->obd_name, npages, it->ooi_total_npages);
1938                 GOTO(out, rc = -EINVAL);
1939         }
1940
1941         it->ooi_rec_size = ii->ii_recsize;
1942         it->ooi_valid_npages = npages;
1943         if (req_capsule_rep_need_swab(&req->rq_pill))
1944                 it->ooi_swab = 1;
1945
1946         it->ooi_next = ii->ii_hash_end;
1947
1948 out:
1949         ptlrpc_req_finished(req);
1950
1951         return rc;
1952 }
1953
1954 /**
1955  * Move the iteration cursor to the next lu_page.
1956  *
1957  * One system page (PAGE_SIZE) may contain multiple lu_page (4KB),
1958  * that depends on the LU_PAGE_COUNT. If it is not the last lu_page
1959  * in current system page, then move the iteration cursor to the next
1960  * lu_page in current system page. Otherwise, if there are more system
1961  * pages in the cache, then move the iteration cursor to the next system
1962  * page. If all the cached records (pages) have been iterated, then fetch
1963  * more records via osp_it_fetch().
1964  *
1965  * \param[in] env       pointer to the thread context
1966  * \param[in] di        pointer to the iteration structure
1967  *
1968  * \retval              positive for end of the directory
1969  * \retval              0 for success
1970  * \retval              negative error number on failure
1971  */
1972 int osp_it_next_page(const struct lu_env *env, struct dt_it *di)
1973 {
1974         struct osp_it           *it = (struct osp_it *)di;
1975         struct lu_idxpage       *idxpage;
1976         struct page             **pages;
1977         int                     rc;
1978         int                     i;
1979         ENTRY;
1980
1981 again2:
1982         idxpage = it->ooi_cur_idxpage;
1983         if (idxpage != NULL) {
1984                 if (idxpage->lip_nr == 0)
1985                         RETURN(1);
1986
1987                 if (it->ooi_pos_ent < idxpage->lip_nr) {
1988                         CDEBUG(D_INFO, "ooi_pos %d nr %d\n",
1989                                (int)it->ooi_pos_ent, (int)idxpage->lip_nr);
1990                         RETURN(0);
1991                 }
1992                 it->ooi_cur_idxpage = NULL;
1993                 it->ooi_pos_lu_page++;
1994
1995 again1:
1996                 if (it->ooi_pos_lu_page < LU_PAGE_COUNT) {
1997                         it->ooi_cur_idxpage = (void *)it->ooi_cur_page +
1998                                          LU_PAGE_SIZE * it->ooi_pos_lu_page;
1999                         if (it->ooi_swab)
2000                                 lustre_swab_lip_header(it->ooi_cur_idxpage);
2001                         if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) {
2002                                 struct osp_device *osp =
2003                                         lu2osp_dev(it->ooi_obj->do_lu.lo_dev);
2004
2005                                 CERROR("%s: invalid magic (%x != %x) for page "
2006                                        "%d/%d while read layout orphan index\n",
2007                                        osp->opd_obd->obd_name,
2008                                        it->ooi_cur_idxpage->lip_magic,
2009                                        LIP_MAGIC, it->ooi_pos_page,
2010                                        it->ooi_pos_lu_page);
2011                                 /* Skip this lu_page next time. */
2012                                 it->ooi_pos_ent = idxpage->lip_nr - 1;
2013                                 RETURN(-EINVAL);
2014                         }
2015                         it->ooi_pos_ent = -1;
2016                         goto again2;
2017                 }
2018
2019                 kunmap(it->ooi_cur_page);
2020                 it->ooi_cur_page = NULL;
2021                 it->ooi_pos_page++;
2022
2023 again0:
2024                 pages = it->ooi_pages;
2025                 if (it->ooi_pos_page < it->ooi_valid_npages) {
2026                         it->ooi_cur_page = kmap(pages[it->ooi_pos_page]);
2027                         it->ooi_pos_lu_page = 0;
2028                         goto again1;
2029                 }
2030
2031                 for (i = 0; i < it->ooi_total_npages; i++) {
2032                         if (pages[i] != NULL)
2033                                 __free_page(pages[i]);
2034                 }
2035                 OBD_FREE_PTR_ARRAY(pages, it->ooi_total_npages);
2036
2037                 it->ooi_pos_page = 0;
2038                 it->ooi_total_npages = 0;
2039                 it->ooi_valid_npages = 0;
2040                 it->ooi_swab = 0;
2041                 it->ooi_ent = NULL;
2042                 it->ooi_cur_page = NULL;
2043                 it->ooi_cur_idxpage = NULL;
2044                 it->ooi_pages = NULL;
2045         }
2046
2047         if (it->ooi_next == II_END_OFF)
2048                 RETURN(1);
2049
2050         rc = osp_it_fetch(env, it);
2051         if (rc == 0)
2052                 goto again0;
2053
2054         RETURN(rc);
2055 }
2056
2057 /**
2058  * Move the iteration cursor to the next record.
2059  *
2060  * If there are more records in the lu_page, then move the iteration
2061  * cursor to the next record directly. Otherwise, move the iteration
2062  * cursor to the record in the next lu_page via osp_it_next_page()
2063  *
2064  * \param[in] env       pointer to the thread context
2065  * \param[in] di        pointer to the iteration structure
2066  *
2067  * \retval              positive for end of the directory
2068  * \retval              0 for success
2069  * \retval              negative error number on failure
2070  */
2071 static int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di)
2072 {
2073         struct osp_it           *it = (struct osp_it *)di;
2074         struct lu_idxpage       *idxpage;
2075         int                     rc;
2076         ENTRY;
2077
2078 again:
2079         idxpage = it->ooi_cur_idxpage;
2080         if (idxpage != NULL) {
2081                 if (idxpage->lip_nr == 0)
2082                         RETURN(1);
2083
2084                 it->ooi_pos_ent++;
2085                 if (it->ooi_pos_ent < idxpage->lip_nr) {
2086                         if (it->ooi_rec_size ==
2087                                         sizeof(struct lu_orphan_rec_v3)) {
2088                                 it->ooi_ent =
2089                                 (struct lu_orphan_ent_v3 *)idxpage->lip_entries+
2090                                                         it->ooi_pos_ent;
2091                                 if (it->ooi_swab)
2092                                         lustre_swab_orphan_ent_v3(it->ooi_ent);
2093                         } else if (it->ooi_rec_size ==
2094                                         sizeof(struct lu_orphan_rec_v2)) {
2095                                 it->ooi_ent =
2096                                 (struct lu_orphan_ent_v2 *)idxpage->lip_entries+
2097                                                         it->ooi_pos_ent;
2098                                 if (it->ooi_swab)
2099                                         lustre_swab_orphan_ent_v2(it->ooi_ent);
2100                         } else {
2101                                 it->ooi_ent =
2102                                 (struct lu_orphan_ent *)idxpage->lip_entries +
2103                                                         it->ooi_pos_ent;
2104                                 if (it->ooi_swab)
2105                                         lustre_swab_orphan_ent(it->ooi_ent);
2106                         }
2107                         RETURN(0);
2108                 }
2109         }
2110
2111         rc = osp_it_next_page(env, di);
2112         if (rc == 0)
2113                 goto again;
2114
2115         RETURN(rc);
2116 }
2117
2118 int osp_it_get(const struct lu_env *env, struct dt_it *di,
2119                const struct dt_key *key)
2120 {
2121         return 1;
2122 }
2123
2124 void osp_it_put(const struct lu_env *env, struct dt_it *di)
2125 {
2126 }
2127
2128 static struct dt_key *osp_orphan_it_key(const struct lu_env *env,
2129                                         const struct dt_it *di)
2130 {
2131         struct osp_it   *it  = (struct osp_it *)di;
2132         struct lu_orphan_ent    *ent = (struct lu_orphan_ent *)it->ooi_ent;
2133
2134         if (likely(ent != NULL))
2135                 return (struct dt_key *)(&ent->loe_key);
2136
2137         return NULL;
2138 }
2139
2140 static int osp_orphan_it_key_size(const struct lu_env *env,
2141                                   const struct dt_it *di)
2142 {
2143         return sizeof(struct lu_fid);
2144 }
2145
2146 static int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di,
2147                              struct dt_rec *rec, __u32 attr)
2148 {
2149         struct osp_it *it = (struct osp_it *)di;
2150
2151         if (likely(it->ooi_ent)) {
2152                 if (it->ooi_rec_size == sizeof(struct lu_orphan_rec_v3)) {
2153                         struct lu_orphan_ent_v3 *ent =
2154                                 (struct lu_orphan_ent_v3 *)it->ooi_ent;
2155
2156                         *(struct lu_orphan_rec_v3 *)rec = ent->loe_rec;
2157                 } else if (it->ooi_rec_size ==
2158                                 sizeof(struct lu_orphan_rec_v2)) {
2159                         struct lu_orphan_ent_v2 *ent =
2160                                 (struct lu_orphan_ent_v2 *)it->ooi_ent;
2161
2162                         *(struct lu_orphan_rec_v2 *)rec = ent->loe_rec;
2163                 } else {
2164                         struct lu_orphan_ent *ent =
2165                                 (struct lu_orphan_ent *)it->ooi_ent;
2166
2167                         *(struct lu_orphan_rec *)rec = ent->loe_rec;
2168                 }
2169                 return 0;
2170         }
2171
2172         return -EINVAL;
2173 }
2174
2175 __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
2176 {
2177         struct osp_it   *it = (struct osp_it *)di;
2178
2179         return it->ooi_next;
2180 }
2181
2182 /**
2183  * Locate the iteration cursor to the specified position (cookie).
2184  *
2185  * \param[in] env       pointer to the thread context
2186  * \param[in] di        pointer to the iteration structure
2187  * \param[in] hash      the specified position
2188  *
2189  * \retval              positive number for locating to the exactly position
2190  *                      or the next
2191  * \retval              0 for arriving at the end of the iteration
2192  * \retval              negative error number on failure
2193  */
2194 int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
2195                        __u64 hash)
2196 {
2197         struct osp_it   *it     = (struct osp_it *)di;
2198         int              rc;
2199
2200         it->ooi_next = hash;
2201         rc = osp_orphan_it_next(env, (struct dt_it *)di);
2202         if (rc == 1)
2203                 return 0;
2204
2205         if (rc == 0)
2206                 return 1;
2207
2208         return rc;
2209 }
2210
2211 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
2212                    void *key_rec)
2213 {
2214         return 0;
2215 }
2216
2217 static const struct dt_index_operations osp_orphan_index_ops = {
2218         .dio_lookup             = osp_orphan_index_lookup,
2219         .dio_declare_insert     = osp_orphan_index_declare_insert,
2220         .dio_insert             = osp_orphan_index_insert,
2221         .dio_declare_delete     = osp_orphan_index_declare_delete,
2222         .dio_delete             = osp_orphan_index_delete,
2223         .dio_it = {
2224                 .init           = osp_it_init,
2225                 .fini           = osp_it_fini,
2226                 .next           = osp_orphan_it_next,
2227                 .get            = osp_it_get,
2228                 .put            = osp_it_put,
2229                 .key            = osp_orphan_it_key,
2230                 .key_size       = osp_orphan_it_key_size,
2231                 .rec            = osp_orphan_it_rec,
2232                 .store          = osp_it_store,
2233                 .load           = osp_orphan_it_load,
2234                 .key_rec        = osp_it_key_rec,
2235         }
2236 };
2237
2238 /**
2239  * Implement OSP layer dt_object_operations::do_index_try() interface.
2240  *
2241  * Negotiate the index type.
2242  *
2243  * If the target index is an IDIF object, then use osp_orphan_index_ops.
2244  * Otherwise, assign osp_md_index_ops to the dt_object::do_index_ops.
2245  * (\see lustre/include/lustre_fid.h for IDIF.)
2246  *
2247  * \param[in] env       pointer to the thread context
2248  * \param[in] dt        pointer to the OSP layer dt_object
2249  * \param[in] feat      unused
2250  *
2251  * \retval              0 for success
2252  */
2253 static int osp_index_try(const struct lu_env *env,
2254                          struct dt_object *dt,
2255                          const struct dt_index_features *feat)
2256 {
2257         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2258
2259         if (fid_is_last_id(fid) && fid_is_idif(fid))
2260                 dt->do_index_ops = &osp_orphan_index_ops;
2261         else
2262                 dt->do_index_ops = &osp_md_index_ops;
2263         return 0;
2264 }
2265
2266 static const struct dt_object_operations osp_obj_ops = {
2267         .do_declare_attr_get    = osp_declare_attr_get,
2268         .do_attr_get            = osp_attr_get,
2269         .do_declare_attr_set    = osp_declare_attr_set,
2270         .do_attr_set            = osp_attr_set,
2271         .do_declare_xattr_get   = osp_declare_xattr_get,
2272         .do_xattr_get           = osp_xattr_get,
2273         .do_declare_xattr_set   = osp_declare_xattr_set,
2274         .do_xattr_set           = osp_xattr_set,
2275         .do_declare_create      = osp_declare_create,
2276         .do_create              = osp_create,
2277         .do_declare_destroy     = osp_declare_destroy,
2278         .do_destroy             = osp_destroy,
2279         .do_index_try           = osp_index_try,
2280 };
2281
2282 /**
2283  * Implement OSP layer lu_object_operations::loo_object_init() interface.
2284  *
2285  * Initialize the object.
2286  *
2287  * If it is a remote MDT object, then call do_attr_get() to fetch
2288  * the attribute from the peer.
2289  *
2290  * \param[in] env       pointer to the thread context
2291  * \param[in] o         pointer to the OSP layer lu_object
2292  * \param[in] conf      unused
2293  *
2294  * \retval              0 for success
2295  * \retval              negative error number on failure
2296  */
2297 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
2298                            const struct lu_object_conf *conf)
2299 {
2300         struct osp_object *po = lu2osp_obj(o);
2301         int rc = 0;
2302
2303         ENTRY;
2304
2305         o->lo_header->loh_attr |= LOHA_REMOTE;
2306
2307         if (is_ost_obj(o)) {
2308                 po->opo_obj.do_ops = &osp_obj_ops;
2309         } else {
2310                 struct lu_attr *la = &osp_env_info(env)->osi_attr;
2311
2312                 po->opo_obj.do_ops = &osp_md_obj_ops;
2313                 po->opo_obj.do_body_ops = &osp_md_body_ops;
2314
2315                 if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
2316                         po->opo_non_exist = 1;
2317                 } else {
2318                         rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
2319                                                              la);
2320                         if (rc == 0)
2321                                 o->lo_header->loh_attr |=
2322                                         LOHA_EXISTS | (la->la_mode & S_IFMT);
2323                         if (rc == -ENOENT) {
2324                                 po->opo_non_exist = 1;
2325                                 rc = 0;
2326                         }
2327                 }
2328         }
2329
2330         RETURN(rc);
2331 }
2332
2333 static void osp_object_free_rcu(struct rcu_head *head)
2334 {
2335         struct osp_object *obj = container_of(head, struct osp_object,
2336                                               opo_header.loh_rcu);
2337
2338         kmem_cache_free(osp_object_kmem, obj);
2339 }
2340
2341 /**
2342  * Implement OSP layer lu_object_operations::loo_object_free() interface.
2343  *
2344  * Finalize the object.
2345  *
2346  * If the OSP object has attributes cache, then destroy the cache.
2347  * Free the object finally.
2348  *
2349  * \param[in] env       pointer to the thread context
2350  * \param[in] o         pointer to the OSP layer lu_object
2351  */
2352 static void osp_object_free(const struct lu_env *env, struct lu_object *o)
2353 {
2354         struct osp_object       *obj = lu2osp_obj(o);
2355         struct lu_object_header *h = o->lo_header;
2356         struct osp_xattr_entry *oxe;
2357         struct osp_xattr_entry *tmp;
2358         int                     count;
2359
2360         dt_object_fini(&obj->opo_obj);
2361         if (h)
2362                 lu_object_header_fini(h);
2363         list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
2364                 list_del_init(&oxe->oxe_list);
2365                 count = atomic_read(&oxe->oxe_ref);
2366                 LASSERTF(count == 1,
2367                          "Still has %d users on the xattr entry %.*s\n",
2368                          count-1, (int)oxe->oxe_namelen, oxe->oxe_name);
2369
2370                 osp_oac_xattr_free(oxe);
2371         }
2372         OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed");
2373         call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu);
2374 }
2375
2376 /**
2377  * Implement OSP layer lu_object_operations::loo_object_release() interface.
2378  *
2379  * Cleanup (not free) the object.
2380  *
2381  * If it is a reserved object but failed to be created, or it is an OST
2382  * object, then mark the object as non-cached.
2383  *
2384  * \param[in] env       pointer to the thread context
2385  * \param[in] o         pointer to the OSP layer lu_object
2386  */
2387 static void osp_object_release(const struct lu_env *env, struct lu_object *o)
2388 {
2389         struct osp_object       *po = lu2osp_obj(o);
2390         struct osp_device       *d  = lu2osp_dev(o->lo_dev);
2391
2392         ENTRY;
2393
2394         /*
2395          * release reservation if object was declared but not created
2396          * this may require lu_object_put() in LOD
2397          */
2398         if (unlikely(po->opo_reserved)) {
2399                 LASSERT(d->opd_pre != NULL);
2400                 LASSERT(d->opd_pre_reserved > 0);
2401                 spin_lock(&d->opd_pre_lock);
2402                 d->opd_pre_reserved--;
2403                 spin_unlock(&d->opd_pre_lock);
2404
2405                 /*
2406                  * Check that osp_precreate_cleanup_orphans is not blocked
2407                  * due to opd_pre_reserved > 0.
2408                  */
2409                 if (unlikely(d->opd_pre_reserved == 0 &&
2410                              (d->opd_pre_recovering || d->opd_pre_status)))
2411                         wake_up(&d->opd_pre_waitq);
2412
2413                 /* not needed in cache any more */
2414                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
2415         }
2416
2417         if (is_ost_obj(o))
2418                 /* XXX: Currently, NOT cache OST-object on MDT because:
2419                  *      1. it is not often accessed on MDT.
2420                  *      2. avoid up layer (such as LFSCK) to load too many
2421                  *         once-used OST-objects. */
2422                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
2423
2424         EXIT;
2425 }
2426
2427 static int osp_object_print(const struct lu_env *env, void *cookie,
2428                             lu_printer_t p, const struct lu_object *l)
2429 {
2430         const struct osp_object *o = lu2osp_obj((struct lu_object *)l);
2431
2432         return (*p)(env, cookie, LUSTRE_OSP_NAME"-object@%p", o);
2433 }
2434
2435 static int osp_object_invariant(const struct lu_object *o)
2436 {
2437         LBUG();
2438 }
2439
2440 const struct lu_object_operations osp_lu_obj_ops = {
2441         .loo_object_init        = osp_object_init,
2442         .loo_object_free        = osp_object_free,
2443         .loo_object_release     = osp_object_release,
2444         .loo_object_print       = osp_object_print,
2445         .loo_object_invariant   = osp_object_invariant
2446 };