Whamcloud - gitweb
LU-6376 osp: add RPC lock during RPC send
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Add OUT_CREATE sub-request into the OUT RPC.
64  *
65  * Note: if the object has already been created, we must add object
66  * destroy sub-request ahead of the create, so it will destroy then
67  * re-create the object.
68  *
69  * \param[in] env       execution environment
70  * \param[in] dt        object to be created
71  * \param[in] attr      attribute of the created object
72  * \param[in] hint      creation hint
73  * \param[in] dof       creation format information
74  * \param[in] th        the transaction handle
75  *
76  * \retval              only return 0 for now
77  */
78 static int __osp_md_declare_object_create(const struct lu_env *env,
79                                           struct dt_object *dt,
80                                           struct lu_attr *attr,
81                                           struct dt_allocation_hint *hint,
82                                           struct dt_object_format *dof,
83                                           struct thandle *th)
84 {
85         struct dt_update_request        *update;
86         int                             rc;
87
88         update = dt_update_request_find_or_create(th, dt);
89         if (IS_ERR(update)) {
90                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
91                        dt->do_lu.lo_dev->ld_obd->obd_name,
92                        (int)PTR_ERR(update));
93                 return PTR_ERR(update);
94         }
95
96         if (lu_object_exists(&dt->do_lu)) {
97                 /* If the object already exists, we needs to destroy
98                  * this orphan object first.
99                  *
100                  * The scenario might happen in this case
101                  *
102                  * 1. client send remote create to MDT0.
103                  * 2. MDT0 send create update to MDT1.
104                  * 3. MDT1 finished create synchronously.
105                  * 4. MDT0 failed and reboot.
106                  * 5. client resend remote create to MDT0.
107                  * 6. MDT0 tries to resend create update to MDT1,
108                  *    but find the object already exists
109                  */
110                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
111                        dt->do_lu.lo_dev->ld_obd->obd_name,
112                        PFID(lu_object_fid(&dt->do_lu)));
113
114                 rc = out_ref_del_pack(env, &update->dur_buf,
115                                       lu_object_fid(&dt->do_lu),
116                                       update->dur_batchid);
117                 if (rc != 0)
118                         GOTO(out, rc);
119
120                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
121                         /* decrease for ".." */
122                         rc = out_ref_del_pack(env, &update->dur_buf,
123                                               lu_object_fid(&dt->do_lu),
124                                               update->dur_batchid);
125                         if (rc != 0)
126                                 GOTO(out, rc);
127                 }
128
129                 rc = out_object_destroy_pack(env, &update->dur_buf,
130                                              lu_object_fid(&dt->do_lu),
131                                              update->dur_batchid);
132                 if (rc != 0)
133                         GOTO(out, rc);
134
135                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
136                 /* Increase batchid to add this orphan object deletion
137                  * to separate transaction */
138                 update_inc_batchid(update);
139         }
140
141         rc = out_create_pack(env, &update->dur_buf,
142                              lu_object_fid(&dt->do_lu), attr, hint, dof,
143                              update->dur_batchid);
144         if (rc != 0)
145                 GOTO(out, rc);
146 out:
147         if (rc)
148                 CERROR("%s: Insert update error: rc = %d\n",
149                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
150
151         return rc;
152 }
153
154 /**
155  * Implementation of dt_object_operations::do_declare_create
156  *
157  * For non-remote transaction, it will add an OUT_CREATE sub-request
158  * into the OUT RPC that will be flushed when the transaction start.
159  *
160  * \param[in] env       execution environment
161  * \param[in] dt        remote object to be created
162  * \param[in] attr      attribute of the created object
163  * \param[in] hint      creation hint
164  * \param[in] dof       creation format information
165  * \param[in] th        the transaction handle
166  *
167  * \retval              0 if the insertion succeeds.
168  * \retval              negative errno if the insertion fails.
169  */
170 int osp_md_declare_object_create(const struct lu_env *env,
171                                  struct dt_object *dt,
172                                  struct lu_attr *attr,
173                                  struct dt_allocation_hint *hint,
174                                  struct dt_object_format *dof,
175                                  struct thandle *th)
176 {
177         int rc = 0;
178
179         if (!is_only_remote_trans(th)) {
180                 rc = __osp_md_declare_object_create(env, dt, attr, hint,
181                                                     dof, th);
182
183                 CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n",
184                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
185         }
186
187         return rc;
188 }
189
190 /**
191  * Implementation of dt_object_operations::do_create
192  *
193  * For remote transaction, it will add an OUT_CREATE sub-request into
194  * the OUT RPC that will be flushed when the transaction stop.
195  *
196  * It sets necessary flags for created object. In DNE phase I,
197  * remote updates are actually executed during transaction start,
198  * i.e. the object has already been created when calling this method.
199  *
200  * \param[in] env       execution environment
201  * \param[in] dt        object to be created
202  * \param[in] attr      attribute of the created object
203  * \param[in] hint      creation hint
204  * \param[in] dof       creation format information
205  * \param[in] th        the transaction handle
206  *
207  * \retval              only return 0 for now
208  */
209 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
210                          struct lu_attr *attr, struct dt_allocation_hint *hint,
211                          struct dt_object_format *dof, struct thandle *th)
212 {
213         int rc = 0;
214
215         if (is_only_remote_trans(th)) {
216                 rc = __osp_md_declare_object_create(env, dt, attr, hint,
217                                                     dof, th);
218
219                 CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n",
220                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
221         }
222
223         if (rc == 0) {
224                 dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS |
225                                                  (attr->la_mode & S_IFMT);
226                 dt2osp_obj(dt)->opo_non_exist = 0;
227         }
228
229         return rc;
230 }
231
232 /**
233  * Add OUT_REF_DEL sub-request into the OUT RPC.
234  *
235  * \param[in] env       execution environment
236  * \param[in] dt        object to decrease the reference count.
237  * \param[in] th        the transaction handle of refcount decrease.
238  *
239  * \retval              0 if the insertion succeeds.
240  * \retval              negative errno if the insertion fails.
241  */
242 static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
243                             struct thandle *th)
244 {
245         struct dt_update_request        *update;
246         int                             rc;
247
248         update = dt_update_request_find_or_create(th, dt);
249         if (IS_ERR(update)) {
250                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
251                        dt->do_lu.lo_dev->ld_obd->obd_name,
252                       (int)PTR_ERR(update));
253                 return PTR_ERR(update);
254         }
255
256         rc = out_ref_del_pack(env, &update->dur_buf,
257                               lu_object_fid(&dt->do_lu),
258                               update->dur_batchid);
259         return rc;
260 }
261
262 /**
263  * Implementation of dt_object_operations::do_declare_ref_del
264  *
265  * For non-remote transaction, it will add an OUT_REF_DEL sub-request
266  * into the OUT RPC that will be flushed when the transaction start.
267  *
268  * \param[in] env       execution environment
269  * \param[in] dt        object to decrease the reference count.
270  * \param[in] th        the transaction handle of refcount decrease.
271  *
272  * \retval              0 if the insertion succeeds.
273  * \retval              negative errno if the insertion fails.
274  */
275 static int osp_md_declare_ref_del(const struct lu_env *env,
276                                   struct dt_object *dt, struct thandle *th)
277 {
278         int rc = 0;
279
280         if (!is_only_remote_trans(th)) {
281                 rc = __osp_md_ref_del(env, dt, th);
282
283                 CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n",
284                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
285         }
286
287         return rc;
288 }
289
290 /**
291  * Implementation of dt_object_operations::do_ref_del
292  *
293  * For remote transaction, it will add an OUT_REF_DEL sub-request into
294  * the OUT RPC that will be flushed when the transaction stop.
295  *
296  * \param[in] env       execution environment
297  * \param[in] dt        object to decrease the reference count
298  * \param[in] th        the transaction handle
299  *
300  * \retval              only return 0 for now
301  */
302 static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
303                           struct thandle *th)
304 {
305         int rc = 0;
306
307         if (is_only_remote_trans(th)) {
308                 rc = __osp_md_ref_del(env, dt, th);
309
310                 CDEBUG(D_INFO, "ref del "DFID": rc = %d\n",
311                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
312         }
313
314         return rc;
315 }
316
317 /**
318  * Add OUT_REF_ADD sub-request into the OUT RPC.
319  *
320  * \param[in] env       execution environment
321  * \param[in] dt        object on which to increase the reference count.
322  * \param[in] th        the transaction handle.
323  *
324  * \retval              0 if the insertion succeeds.
325  * \retval              negative errno if the insertion fails.
326  */
327 static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
328                             struct thandle *th)
329 {
330         struct dt_update_request        *update;
331         int                             rc;
332
333         update = dt_update_request_find_or_create(th, dt);
334         if (IS_ERR(update)) {
335                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
336                        dt->do_lu.lo_dev->ld_obd->obd_name,
337                        (int)PTR_ERR(update));
338                 return PTR_ERR(update);
339         }
340
341         rc = out_ref_add_pack(env, &update->dur_buf,
342                               lu_object_fid(&dt->do_lu),
343                               update->dur_batchid);
344
345         return rc;
346 }
347
348 /**
349  * Implementation of dt_object_operations::do_declare_ref_del
350  *
351  * For non-remote transaction, it will add an OUT_REF_ADD sub-request
352  * into the OUT RPC that will be flushed when the transaction start.
353  *
354  * \param[in] env       execution environment
355  * \param[in] dt        object on which to increase the reference count.
356  * \param[in] th        the transaction handle.
357  *
358  * \retval              0 if the insertion succeeds.
359  * \retval              negative errno if the insertion fails.
360  */
361 static int osp_md_declare_ref_add(const struct lu_env *env,
362                                   struct dt_object *dt, struct thandle *th)
363 {
364         int rc = 0;
365
366         if (!is_only_remote_trans(th)) {
367                 rc = __osp_md_ref_add(env, dt, th);
368
369                 CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n",
370                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
371         }
372
373         return rc;
374 }
375
376 /**
377  * Implementation of dt_object_operations::do_ref_add
378  *
379  * For remote transaction, it will add an OUT_REF_ADD sub-request into
380  * the OUT RPC that will be flushed when the transaction stop.
381  *
382  * \param[in] env       execution environment
383  * \param[in] dt        object on which to increase the reference count
384  * \param[in] th        the transaction handle
385  *
386  * \retval              only return 0 for now
387  */
388 static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
389                           struct thandle *th)
390 {
391         int rc = 0;
392
393         if (is_only_remote_trans(th)) {
394                 rc = __osp_md_ref_add(env, dt, th);
395
396                 CDEBUG(D_INFO, "ref add "DFID": rc = %d\n",
397                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
398         }
399
400         return rc;
401 }
402
403 /**
404  * Implementation of dt_object_operations::do_ah_init
405  *
406  * Initialize the allocation hint for object creation, which is usually called
407  * before the creation, and these hints (parent and child mode) will be sent to
408  * the remote Object Update Target (OUT) and used in the object create process,
409  * same as OSD object creation.
410  *
411  * \param[in] env       execution environment
412  * \param[in] ah        the hint to be initialized
413  * \param[in] parent    the parent of the object
414  * \param[in] child     the object to be created
415  * \param[in] child_mode the mode of the created object
416  */
417 static void osp_md_ah_init(const struct lu_env *env,
418                            struct dt_allocation_hint *ah,
419                            struct dt_object *parent,
420                            struct dt_object *child,
421                            umode_t child_mode)
422 {
423         LASSERT(ah);
424
425         ah->dah_parent = parent;
426         ah->dah_mode = child_mode;
427 }
428
429 /**
430  * Add OUT_ATTR_SET sub-request into the OUT RPC.
431  *
432  * \param[in] env       execution environment
433  * \param[in] dt        object on which to set attributes
434  * \param[in] attr      attributes to be set
435  * \param[in] th        the transaction handle
436  *
437  * \retval              0 if the insertion succeeds.
438  * \retval              negative errno if the insertion fails.
439  */
440 int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
441                       const struct lu_attr *attr, struct thandle *th)
442 {
443         struct dt_update_request        *update;
444         int                             rc;
445
446         update = dt_update_request_find_or_create(th, dt);
447         if (IS_ERR(update)) {
448                 CERROR("%s: Get OSP update buf failed: %d\n",
449                        dt->do_lu.lo_dev->ld_obd->obd_name,
450                        (int)PTR_ERR(update));
451                 return PTR_ERR(update);
452         }
453
454         rc = out_attr_set_pack(env, &update->dur_buf,
455                                lu_object_fid(&dt->do_lu), attr,
456                                update->dur_batchid);
457
458         return rc;
459 }
460
461 /**
462  * Implementation of dt_object_operations::do_declare_attr_get
463  *
464  * Declare setting attributes to the specified remote object.
465  *
466  * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET
467  * sub-request into the OUT RPC that will be flushed when the transaction start.
468  *
469  * \param[in] env       execution environment
470  * \param[in] dt        object on which to set attributes
471  * \param[in] attr      attributes to be set
472  * \param[in] th        the transaction handle
473  *
474  * \retval              0 if the insertion succeeds.
475  * \retval              negative errno if the insertion fails.
476  */
477 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
478                             const struct lu_attr *attr, struct thandle *th)
479 {
480         int rc = 0;
481
482         if (!is_only_remote_trans(th)) {
483                 rc = __osp_md_attr_set(env, dt, attr, th);
484
485                 CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n",
486                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
487         }
488
489         return rc;
490 }
491
492 /**
493  * Implementation of dt_object_operations::do_attr_set
494  *
495  * Set attributes to the specified remote object.
496  *
497  * If the transaction is a remote transaction, then add the OUT_ATTR_SET
498  * sub-request into the OUT RPC that will be flushed when the transaction stop.
499  *
500  * \param[in] env       execution environment
501  * \param[in] dt        object to set attributes
502  * \param[in] attr      attributes to be set
503  * \param[in] th        the transaction handle
504  * \param[in] capa      capability of setting attributes (not yet implemented).
505  *
506  * \retval              only return 0 for now
507  */
508 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
509                     const struct lu_attr *attr, struct thandle *th,
510                     struct lustre_capa *capa)
511 {
512         int rc = 0;
513
514         if (is_only_remote_trans(th)) {
515                 rc = __osp_md_attr_set(env, dt, attr, th);
516
517                 CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n",
518                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
519         }
520
521         return rc;
522 }
523
524 /**
525  * Implementation of dt_object_operations::do_read_lock
526  *
527  * osp_md_object_{read,write}_lock() will only lock the remote object in the
528  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
529  * lock the object. Note: it will not lock the object in the whole cluster,
530  * which relies on the LDLM lock.
531  *
532  * \param[in] env       execution environment
533  * \param[in] dt        object to be locked
534  * \param[in] role      lock role from MDD layer, see mdd_object_role().
535  */
536 static void osp_md_object_read_lock(const struct lu_env *env,
537                                     struct dt_object *dt, unsigned role)
538 {
539         struct osp_object  *obj = dt2osp_obj(dt);
540
541         LASSERT(obj->opo_owner != env);
542         down_read_nested(&obj->opo_sem, role);
543
544         LASSERT(obj->opo_owner == NULL);
545 }
546
547 /**
548  * Implementation of dt_object_operations::do_write_lock
549  *
550  * Lock the remote object in write mode.
551  *
552  * \param[in] env       execution environment
553  * \param[in] dt        object to be locked
554  * \param[in] role      lock role from MDD layer, see mdd_object_role().
555  */
556 static void osp_md_object_write_lock(const struct lu_env *env,
557                                      struct dt_object *dt, unsigned role)
558 {
559         struct osp_object *obj = dt2osp_obj(dt);
560
561         down_write_nested(&obj->opo_sem, role);
562
563         LASSERT(obj->opo_owner == NULL);
564         obj->opo_owner = env;
565 }
566
567 /**
568  * Implementation of dt_object_operations::do_read_unlock
569  *
570  * Unlock the read lock of remote object.
571  *
572  * \param[in] env       execution environment
573  * \param[in] dt        object to be unlocked
574  */
575 static void osp_md_object_read_unlock(const struct lu_env *env,
576                                       struct dt_object *dt)
577 {
578         struct osp_object *obj = dt2osp_obj(dt);
579
580         up_read(&obj->opo_sem);
581 }
582
583 /**
584  * Implementation of dt_object_operations::do_write_unlock
585  *
586  * Unlock the write lock of remote object.
587  *
588  * \param[in] env       execution environment
589  * \param[in] dt        object to be unlocked
590  */
591 static void osp_md_object_write_unlock(const struct lu_env *env,
592                                        struct dt_object *dt)
593 {
594         struct osp_object *obj = dt2osp_obj(dt);
595
596         LASSERT(obj->opo_owner == env);
597         obj->opo_owner = NULL;
598         up_write(&obj->opo_sem);
599 }
600
601 /**
602  * Implementation of dt_object_operations::do_write_locked
603  *
604  * Test if the object is locked in write mode.
605  *
606  * \param[in] env       execution environment
607  * \param[in] dt        object to be tested
608  */
609 static int osp_md_object_write_locked(const struct lu_env *env,
610                                       struct dt_object *dt)
611 {
612         struct osp_object *obj = dt2osp_obj(dt);
613
614         return obj->opo_owner == env;
615 }
616
617 /**
618  * Implementation of dt_index_operations::dio_lookup
619  *
620  * Look up record by key under a remote index object. It packs lookup update
621  * into RPC, sends to the remote OUT and waits for the lookup result.
622  *
623  * \param[in] env       execution environment
624  * \param[in] dt        index object to lookup
625  * \param[out] rec      record in which to return lookup result
626  * \param[in] key       key of index which will be looked up
627  * \param[in] capa      capability of lookup (not yet implemented)
628  *
629  * \retval              1 if the lookup succeeds.
630  * \retval              negative errno if the lookup fails.
631  */
632 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
633                                struct dt_rec *rec, const struct dt_key *key,
634                                struct lustre_capa *capa)
635 {
636         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
637         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
638         struct dt_device        *dt_dev = &osp->opd_dt_dev;
639         struct dt_update_request   *update;
640         struct object_update_reply *reply;
641         struct ptlrpc_request      *req = NULL;
642         struct lu_fid              *fid;
643         int                        rc;
644         ENTRY;
645
646         /* Because it needs send the update buffer right away,
647          * just create an update buffer, instead of attaching the
648          * update_remote list of the thandle.
649          */
650         update = dt_update_request_create(dt_dev);
651         if (IS_ERR(update))
652                 RETURN(PTR_ERR(update));
653
654         rc = out_index_lookup_pack(env, &update->dur_buf,
655                                    lu_object_fid(&dt->do_lu), rec, key);
656         if (rc != 0) {
657                 CERROR("%s: Insert update error: rc = %d\n",
658                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
659                 GOTO(out, rc);
660         }
661
662         rc = osp_remote_sync(env, osp, update, &req, false);
663         if (rc < 0)
664                 GOTO(out, rc);
665
666         reply = req_capsule_server_sized_get(&req->rq_pill,
667                                              &RMF_OUT_UPDATE_REPLY,
668                                              OUT_UPDATE_REPLY_SIZE);
669         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
670                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
671                        dt_dev->dd_lu_dev.ld_obd->obd_name,
672                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
673                 GOTO(out, rc = -EPROTO);
674         }
675
676         rc = object_update_result_data_get(reply, lbuf, 0);
677         if (rc < 0)
678                 GOTO(out, rc);
679
680         if (lbuf->lb_len != sizeof(*fid)) {
681                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
682                        dt_dev->dd_lu_dev.ld_obd->obd_name,
683                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
684                        (int)lbuf->lb_len);
685                 GOTO(out, rc = -EINVAL);
686         }
687
688         fid = lbuf->lb_buf;
689         if (ptlrpc_rep_need_swab(req))
690                 lustre_swab_lu_fid(fid);
691         if (!fid_is_sane(fid)) {
692                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
693                        dt_dev->dd_lu_dev.ld_obd->obd_name,
694                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
695                 GOTO(out, rc = -EINVAL);
696         }
697
698         memcpy(rec, fid, sizeof(*fid));
699
700         GOTO(out, rc = 1);
701
702 out:
703         if (req != NULL)
704                 ptlrpc_req_finished(req);
705
706         dt_update_request_destroy(update);
707
708         return rc;
709 }
710
711 /**
712  * Add OUT_INDEX_INSERT sub-request into the OUT RPC.
713  *
714  * \param[in] env       execution environment
715  * \param[in] dt        object for which to insert index
716  * \param[in] rec       record of the index which will be inserted
717  * \param[in] key       key of the index which will be inserted
718  * \param[in] th        the transaction handle
719  *
720  * \retval              0 if the insertion succeeds.
721  * \retval              negative errno if the insertion fails.
722  */
723 static int __osp_md_index_insert(const struct lu_env *env,
724                                  struct dt_object *dt,
725                                  const struct dt_rec *rec,
726                                  const struct dt_key *key,
727                                  struct thandle *th)
728 {
729         struct dt_update_request *update;
730         int                      rc;
731
732         update = dt_update_request_find_or_create(th, dt);
733         if (IS_ERR(update)) {
734                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
735                        dt->do_lu.lo_dev->ld_obd->obd_name,
736                        (int)PTR_ERR(update));
737                 return PTR_ERR(update);
738         }
739
740         rc = out_index_insert_pack(env, &update->dur_buf,
741                                    lu_object_fid(&dt->do_lu), rec, key,
742                                    update->dur_batchid);
743         return rc;
744 }
745
746 /**
747  * Implementation of dt_index_operations::dio_declare_insert
748  *
749  * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request
750  * into the OUT RPC that will be flushed when the transaction start.
751  *
752  * \param[in] env       execution environment
753  * \param[in] dt        object for which to insert index
754  * \param[in] rec       record of the index which will be inserted
755  * \param[in] key       key of the index which will be inserted
756  * \param[in] th        the transaction handle
757  *
758  * \retval              0 if the insertion succeeds.
759  * \retval              negative errno if the insertion fails.
760  */
761 static int osp_md_declare_index_insert(const struct lu_env *env,
762                                        struct dt_object *dt,
763                                        const struct dt_rec *rec,
764                                        const struct dt_key *key,
765                                        struct thandle *th)
766 {
767         int rc = 0;
768
769         if (!is_only_remote_trans(th)) {
770                 rc = __osp_md_index_insert(env, dt, rec, key, th);
771
772                 CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID
773                        ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
774                        (char *)key,
775                        PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
776         }
777
778         return rc;
779 }
780
781 /**
782  * Implementation of dt_index_operations::dio_insert
783  *
784  * For remote transaction, it will add an OUT_INDEX_INSERT sub-request
785  * into the OUT RPC that will be flushed when the transaction stop.
786  *
787  * \param[in] env       execution environment
788  * \param[in] dt        object for which to insert index
789  * \param[in] rec       record of the index to be inserted
790  * \param[in] key       key of the index to be inserted
791  * \param[in] th        the transaction handle
792  * \param[in] capa      capability of insert (not yet implemented)
793  * \param[in] ignore_quota quota enforcement for insert
794  *
795  * \retval              only return 0 for now
796  */
797 static int osp_md_index_insert(const struct lu_env *env,
798                                struct dt_object *dt,
799                                const struct dt_rec *rec,
800                                const struct dt_key *key,
801                                struct thandle *th,
802                                struct lustre_capa *capa,
803                                int ignore_quota)
804 {
805         int rc = 0;
806
807         if (is_only_remote_trans(th)) {
808                 rc = __osp_md_index_insert(env, dt, rec, key, th);
809
810                 CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID
811                        ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
812                        (char *)key,
813                        PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
814         }
815
816         return rc;
817 }
818
819 /**
820  * Add OUT_INDEX_DELETE sub-request into the OUT RPC.
821  *
822  * \param[in] env       execution environment
823  * \param[in] dt        object for which to delete index
824  * \param[in] key       key of the index
825  * \param[in] th        the transaction handle
826  *
827  * \retval              0 if the insertion succeeds.
828  * \retval              negative errno if the insertion fails.
829  */
830 static int __osp_md_index_delete(const struct lu_env *env,
831                                  struct dt_object *dt,
832                                  const struct dt_key *key,
833                                  struct thandle *th)
834 {
835         struct dt_update_request *update;
836         int                      rc;
837
838         update = dt_update_request_find_or_create(th, dt);
839         if (IS_ERR(update)) {
840                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
841                        dt->do_lu.lo_dev->ld_obd->obd_name,
842                        (int)PTR_ERR(update));
843                 return PTR_ERR(update);
844         }
845
846         rc = out_index_delete_pack(env, &update->dur_buf,
847                                    lu_object_fid(&dt->do_lu), key,
848                                    update->dur_batchid);
849         return rc;
850 }
851
852 /**
853  * Implementation of dt_index_operations::dio_declare_delete
854  *
855  * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request
856  * into the OUT RPC that will be flushed when the transaction start.
857  *
858  * \param[in] env       execution environment
859  * \param[in] dt        object for which to delete index
860  * \param[in] key       key of the index
861  * \param[in] th        the transaction handle
862  *
863  * \retval              0 if the insertion succeeds.
864  * \retval              negative errno if the insertion fails.
865  */
866 static int osp_md_declare_index_delete(const struct lu_env *env,
867                                        struct dt_object *dt,
868                                        const struct dt_key *key,
869                                        struct thandle *th)
870 {
871         int rc = 0;
872
873         if (!is_only_remote_trans(th)) {
874                 rc = __osp_md_index_delete(env, dt, key, th);
875
876                 CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n",
877                        PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
878         }
879
880         return rc;
881 }
882
883 /**
884  * Implementation of dt_index_operations::dio_delete
885  *
886  * For remote transaction, it will add an OUT_INDEX_DELETE sub-request
887  * into the OUT RPC that will be flushed when the transaction stop.
888  *
889  * \param[in] env       execution environment
890  * \param[in] dt        object for which to delete index
891  * \param[in] key       key of the index which will be deleted
892  * \param[in] th        the transaction handle
893  * \param[in] capa      capability of delete (not yet implemented)
894  *
895  * \retval              only return 0 for now
896  */
897 static int osp_md_index_delete(const struct lu_env *env,
898                                struct dt_object *dt,
899                                const struct dt_key *key,
900                                struct thandle *th,
901                                struct lustre_capa *capa)
902 {
903         int rc = 0;
904
905         if (is_only_remote_trans(th)) {
906                 rc = __osp_md_index_delete(env, dt, key, th);
907
908                 CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n",
909                        PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
910         }
911
912         return rc;
913 }
914
915 /**
916  * Implementation of dt_index_operations::dio_it.next
917  *
918  * Advance the pointer of the iterator to the next entry. It shares a similar
919  * internal implementation with osp_orphan_it_next(), which is being used for
920  * remote orphan index object. This method will be used for remote directory.
921  *
922  * \param[in] env       execution environment
923  * \param[in] di        iterator of this iteration
924  *
925  * \retval              0 if the pointer is advanced successfuly.
926  * \retval              1 if it reaches to the end of the index object.
927  * \retval              negative errno if the pointer cannot be advanced.
928  */
929 static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
930 {
931         struct osp_it           *it = (struct osp_it *)di;
932         struct lu_idxpage       *idxpage;
933         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
934         int                     rc;
935         ENTRY;
936
937 again:
938         idxpage = it->ooi_cur_idxpage;
939         if (idxpage != NULL) {
940                 if (idxpage->lip_nr == 0)
941                         RETURN(1);
942
943                 it->ooi_pos_ent++;
944                 if (ent == NULL) {
945                         it->ooi_ent =
946                               (struct lu_dirent *)idxpage->lip_entries;
947                         RETURN(0);
948                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
949                            it->ooi_pos_ent < idxpage->lip_nr) {
950                         ent = (struct lu_dirent *)(((char *)ent) +
951                                         le16_to_cpu(ent->lde_reclen));
952                         it->ooi_ent = ent;
953                         RETURN(0);
954                 } else {
955                         it->ooi_ent = NULL;
956                 }
957         }
958
959         rc = osp_it_next_page(env, di);
960         if (rc == 0)
961                 goto again;
962
963         RETURN(rc);
964 }
965
966 /**
967  * Implementation of dt_index_operations::dio_it.key
968  *
969  * Get the key at current iterator poisiton. These iteration methods
970  * (dio_it) will only be used for iterating the remote directory, so
971  * the key is the name of the directory entry.
972  *
973  * \param[in] env       execution environment
974  * \param[in] di        iterator of this iteration
975  *
976  * \retval              name of the current entry
977  */
978 static struct dt_key *osp_it_key(const struct lu_env *env,
979                                  const struct dt_it *di)
980 {
981         struct osp_it           *it = (struct osp_it *)di;
982         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
983
984         return (struct dt_key *)ent->lde_name;
985 }
986
987 /**
988  * Implementation of dt_index_operations::dio_it.key_size
989  *
990  * Get the key size at current iterator poisiton. These iteration methods
991  * (dio_it) will only be used for iterating the remote directory, so the key
992  * size is the name size of the directory entry.
993  *
994  * \param[in] env       execution environment
995  * \param[in] di        iterator of this iteration
996  *
997  * \retval              name size of the current entry
998  */
999
1000 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
1001 {
1002         struct osp_it           *it = (struct osp_it *)di;
1003         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
1004
1005         return (int)le16_to_cpu(ent->lde_namelen);
1006 }
1007
1008 /**
1009  * Implementation of dt_index_operations::dio_it.rec
1010  *
1011  * Get the record at current iterator position. These iteration methods
1012  * (dio_it) will only be used for iterating the remote directory, so it
1013  * uses lu_dirent_calc_size() to calculate the record size.
1014  *
1015  * \param[in] env       execution environment
1016  * \param[in] di        iterator of this iteration
1017  * \param[out] rec      the record to be returned
1018  * \param[in] attr      attributes of the index object, so it knows
1019  *                      how to pack the entry.
1020  *
1021  * \retval              only return 0 for now
1022  */
1023 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1024                                struct dt_rec *rec, __u32 attr)
1025 {
1026         struct osp_it           *it = (struct osp_it *)di;
1027         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
1028         size_t                  reclen;
1029
1030         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
1031         memcpy(rec, ent, reclen);
1032         return 0;
1033 }
1034
1035 /**
1036  * Implementation of dt_index_operations::dio_it.load
1037  *
1038  * Locate the iteration cursor to the specified position (cookie).
1039  *
1040  * \param[in] env       pointer to the thread context
1041  * \param[in] di        pointer to the iteration structure
1042  * \param[in] hash      the specified position
1043  *
1044  * \retval              positive number for locating to the exactly position
1045  *                      or the next
1046  * \retval              0 for arriving at the end of the iteration
1047  * \retval              negative error number on failure
1048  */
1049 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
1050                        __u64 hash)
1051 {
1052         struct osp_it   *it     = (struct osp_it *)di;
1053         int              rc;
1054
1055         it->ooi_next = hash;
1056         rc = osp_md_index_it_next(env, (struct dt_it *)di);
1057         if (rc == 1)
1058                 return 0;
1059
1060         if (rc == 0)
1061                 return 1;
1062
1063         return rc;
1064 }
1065
1066 const struct dt_index_operations osp_md_index_ops = {
1067         .dio_lookup         = osp_md_index_lookup,
1068         .dio_declare_insert = osp_md_declare_index_insert,
1069         .dio_insert         = osp_md_index_insert,
1070         .dio_declare_delete = osp_md_declare_index_delete,
1071         .dio_delete         = osp_md_index_delete,
1072         .dio_it     = {
1073                 .init     = osp_it_init,
1074                 .fini     = osp_it_fini,
1075                 .get      = osp_it_get,
1076                 .put      = osp_it_put,
1077                 .next     = osp_md_index_it_next,
1078                 .key      = osp_it_key,
1079                 .key_size = osp_it_key_size,
1080                 .rec      = osp_md_index_it_rec,
1081                 .store    = osp_it_store,
1082                 .load     = osp_it_load,
1083                 .key_rec  = osp_it_key_rec,
1084         }
1085 };
1086
1087 /**
1088  * Implementation of dt_object_operations::do_index_try
1089  *
1090  * Try to initialize the index API pointer for the given object. This
1091  * is the entry point of the index API, i.e. we must call this method
1092  * to initialize the index object before calling other index methods.
1093  *
1094  * \param[in] env       execution environment
1095  * \param[in] dt        index object to be initialized
1096  * \param[in] feat      the index feature of the object
1097  *
1098  * \retval              0 if the initialization succeeds.
1099  * \retval              negative errno if the initialization fails.
1100  */
1101 static int osp_md_index_try(const struct lu_env *env,
1102                             struct dt_object *dt,
1103                             const struct dt_index_features *feat)
1104 {
1105         dt->do_index_ops = &osp_md_index_ops;
1106         return 0;
1107 }
1108
1109 /**
1110  * Implementation of dt_object_operations::do_object_lock
1111  *
1112  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
1113  * which will lock the object in the global namespace.
1114  *
1115  * \param[in] env       execution environment
1116  * \param[in] dt        object to be locked
1117  * \param[out] lh       lock handle
1118  * \param[in] einfo     enqueue information
1119  * \param[in] policy    lock policy
1120  *
1121  * \retval              ELDLM_OK if locking the object succeeds.
1122  * \retval              negative errno if locking fails.
1123  */
1124 static int osp_md_object_lock(const struct lu_env *env,
1125                               struct dt_object *dt,
1126                               struct lustre_handle *lh,
1127                               struct ldlm_enqueue_info *einfo,
1128                               ldlm_policy_data_t *policy)
1129 {
1130         struct ldlm_res_id      *res_id;
1131         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1132         struct osp_device       *osp = dt2osp_dev(dt_dev);
1133         struct ptlrpc_request   *req;
1134         int                     rc = 0;
1135         __u64                   flags = 0;
1136         ldlm_mode_t             mode;
1137
1138         res_id = einfo->ei_res_id;
1139         LASSERT(res_id != NULL);
1140
1141         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
1142                                LDLM_FL_BLOCK_GRANTED, res_id,
1143                                einfo->ei_type, policy,
1144                                einfo->ei_mode, lh, 0);
1145         if (mode > 0)
1146                 return ELDLM_OK;
1147
1148         req = ldlm_enqueue_pack(osp->opd_exp, 0);
1149         if (IS_ERR(req))
1150                 RETURN(PTR_ERR(req));
1151
1152         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
1153                               (const ldlm_policy_data_t *)policy,
1154                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
1155
1156         ptlrpc_req_finished(req);
1157
1158         return rc == ELDLM_OK ? 0 : -EIO;
1159 }
1160
1161 /**
1162  * Implementation of dt_object_operations::do_object_unlock
1163  *
1164  * Cancel a lock of a remote object.
1165  *
1166  * \param[in] env       execution environment
1167  * \param[in] dt        object to be unlocked
1168  * \param[in] einfo     lock enqueue information
1169  * \param[in] policy    lock policy
1170  *
1171  * \retval              Only return 0 for now.
1172  */
1173 static int osp_md_object_unlock(const struct lu_env *env,
1174                                 struct dt_object *dt,
1175                                 struct ldlm_enqueue_info *einfo,
1176                                 ldlm_policy_data_t *policy)
1177 {
1178         struct lustre_handle    *lockh = einfo->ei_cbdata;
1179
1180         /* unlock finally */
1181         ldlm_lock_decref(lockh, einfo->ei_mode);
1182
1183         return 0;
1184 }
1185
1186 struct dt_object_operations osp_md_obj_ops = {
1187         .do_read_lock         = osp_md_object_read_lock,
1188         .do_write_lock        = osp_md_object_write_lock,
1189         .do_read_unlock       = osp_md_object_read_unlock,
1190         .do_write_unlock      = osp_md_object_write_unlock,
1191         .do_write_locked      = osp_md_object_write_locked,
1192         .do_declare_create    = osp_md_declare_object_create,
1193         .do_create            = osp_md_object_create,
1194         .do_declare_ref_add   = osp_md_declare_ref_add,
1195         .do_ref_add           = osp_md_ref_add,
1196         .do_declare_ref_del   = osp_md_declare_ref_del,
1197         .do_ref_del           = osp_md_ref_del,
1198         .do_declare_destroy   = osp_declare_object_destroy,
1199         .do_destroy           = osp_object_destroy,
1200         .do_ah_init           = osp_md_ah_init,
1201         .do_attr_get          = osp_attr_get,
1202         .do_declare_attr_set  = osp_md_declare_attr_set,
1203         .do_attr_set          = osp_md_attr_set,
1204         .do_xattr_get         = osp_xattr_get,
1205         .do_declare_xattr_set = osp_declare_xattr_set,
1206         .do_xattr_set         = osp_xattr_set,
1207         .do_declare_xattr_del = osp_declare_xattr_del,
1208         .do_xattr_del         = osp_xattr_del,
1209         .do_index_try         = osp_md_index_try,
1210         .do_object_lock       = osp_md_object_lock,
1211         .do_object_unlock     = osp_md_object_unlock,
1212 };
1213
1214 /**
1215  * Implementation of dt_body_operations::dbo_declare_write
1216  *
1217  * Declare an object write. In DNE phase I, it will pack the write
1218  * object update into the RPC.
1219  *
1220  * \param[in] env       execution environment
1221  * \param[in] dt        object to be written
1222  * \param[in] buf       buffer to write which includes an embedded size field
1223  * \param[in] pos       offet in the object to start writing at
1224  * \param[in] th        transaction handle
1225  *
1226  * \retval              0 if the insertion succeeds.
1227  * \retval              negative errno if the insertion fails.
1228  */
1229 static ssize_t osp_md_declare_write(const struct lu_env *env,
1230                                     struct dt_object *dt,
1231                                     const struct lu_buf *buf,
1232                                     loff_t pos, struct thandle *th)
1233 {
1234         struct dt_update_request  *update;
1235         ssize_t                   rc;
1236
1237         update = dt_update_request_find_or_create(th, dt);
1238         if (IS_ERR(update)) {
1239                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
1240                        dt->do_lu.lo_dev->ld_obd->obd_name,
1241                        (int)PTR_ERR(update));
1242                 return PTR_ERR(update);
1243         }
1244
1245         rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
1246                             buf, pos, update->dur_batchid);
1247
1248         return rc;
1249
1250 }
1251
1252 /**
1253  * Implementation of dt_body_operations::dbo_write
1254  *
1255  * Return the buffer size. In DNE phase I, remote updates
1256  * are actually executed during transaction start, the buffer has
1257  * already been written when this method is being called.
1258  *
1259  * \param[in] env       execution environment
1260  * \param[in] dt        object to be written
1261  * \param[in] buf       buffer to write which includes an embedded size field
1262  * \param[in] pos       offet in the object to start writing at
1263  * \param[in] th        transaction handle
1264  * \param[in] capa      capability of the write (not yet implemented)
1265  * \param[in] ignore_quota quota enforcement for this write
1266  *
1267  * \retval              the buffer size in bytes.
1268  */
1269 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1270                             const struct lu_buf *buf, loff_t *pos,
1271                             struct thandle *handle,
1272                             struct lustre_capa *capa, int ignore_quota)
1273 {
1274         *pos += buf->lb_len;
1275         return buf->lb_len;
1276 }
1277
1278 /* These body operation will be used to write symlinks during migration etc */
1279 struct dt_body_operations osp_md_body_ops = {
1280         .dbo_declare_write      = osp_md_declare_write,
1281         .dbo_write              = osp_md_write,
1282 };