Whamcloud - gitweb
c0314572e9493001a36cf350d5c72a095a055a8c
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Add OUT_CREATE sub-request into the OUT RPC.
64  *
65  * Note: if the object has already been created, we must add object
66  * destroy sub-request ahead of the create, so it will destroy then
67  * re-create the object.
68  *
69  * \param[in] env       execution environment
70  * \param[in] dt        object to be created
71  * \param[in] attr      attribute of the created object
72  * \param[in] hint      creation hint
73  * \param[in] dof       creation format information
74  * \param[in] th        the transaction handle
75  *
76  * \retval              only return 0 for now
77  */
78 static int __osp_md_declare_object_create(const struct lu_env *env,
79                                           struct dt_object *dt,
80                                           struct lu_attr *attr,
81                                           struct dt_allocation_hint *hint,
82                                           struct dt_object_format *dof,
83                                           struct thandle *th)
84 {
85         struct dt_update_request        *update;
86         int                             rc;
87
88         update = dt_update_request_find_or_create(th, dt);
89         if (IS_ERR(update)) {
90                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
91                        dt->do_lu.lo_dev->ld_obd->obd_name,
92                        (int)PTR_ERR(update));
93                 return PTR_ERR(update);
94         }
95
96         if (lu_object_exists(&dt->do_lu)) {
97                 /* If the object already exists, we needs to destroy
98                  * this orphan object first.
99                  *
100                  * The scenario might happen in this case
101                  *
102                  * 1. client send remote create to MDT0.
103                  * 2. MDT0 send create update to MDT1.
104                  * 3. MDT1 finished create synchronously.
105                  * 4. MDT0 failed and reboot.
106                  * 5. client resend remote create to MDT0.
107                  * 6. MDT0 tries to resend create update to MDT1,
108                  *    but find the object already exists
109                  */
110                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
111                        dt->do_lu.lo_dev->ld_obd->obd_name,
112                        PFID(lu_object_fid(&dt->do_lu)));
113
114                 rc = out_ref_del_pack(env, &update->dur_buf,
115                                       lu_object_fid(&dt->do_lu),
116                                       update->dur_batchid);
117                 if (rc != 0)
118                         GOTO(out, rc);
119
120                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
121                         /* decrease for ".." */
122                         rc = out_ref_del_pack(env, &update->dur_buf,
123                                               lu_object_fid(&dt->do_lu),
124                                               update->dur_batchid);
125                         if (rc != 0)
126                                 GOTO(out, rc);
127                 }
128
129                 rc = out_object_destroy_pack(env, &update->dur_buf,
130                                              lu_object_fid(&dt->do_lu),
131                                              update->dur_batchid);
132                 if (rc != 0)
133                         GOTO(out, rc);
134
135                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
136                 /* Increase batchid to add this orphan object deletion
137                  * to separate transaction */
138                 update_inc_batchid(update);
139         }
140
141         rc = out_create_pack(env, &update->dur_buf,
142                              lu_object_fid(&dt->do_lu), attr, hint, dof,
143                              update->dur_batchid);
144         if (rc != 0)
145                 GOTO(out, rc);
146 out:
147         if (rc)
148                 CERROR("%s: Insert update error: rc = %d\n",
149                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
150
151         return rc;
152 }
153
154 /**
155  * Implementation of dt_object_operations::do_declare_create
156  *
157  * For non-remote transaction, it will add an OUT_CREATE sub-request
158  * into the OUT RPC that will be flushed when the transaction start.
159  *
160  * \param[in] env       execution environment
161  * \param[in] dt        remote object to be created
162  * \param[in] attr      attribute of the created object
163  * \param[in] hint      creation hint
164  * \param[in] dof       creation format information
165  * \param[in] th        the transaction handle
166  *
167  * \retval              0 if the insertion succeeds.
168  * \retval              negative errno if the insertion fails.
169  */
170 int osp_md_declare_object_create(const struct lu_env *env,
171                                  struct dt_object *dt,
172                                  struct lu_attr *attr,
173                                  struct dt_allocation_hint *hint,
174                                  struct dt_object_format *dof,
175                                  struct thandle *th)
176 {
177         int rc = 0;
178
179         if (!is_only_remote_trans(th)) {
180                 rc = __osp_md_declare_object_create(env, dt, attr, hint,
181                                                     dof, th);
182
183                 CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n",
184                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
185         }
186
187         return rc;
188 }
189
190 /**
191  * Implementation of dt_object_operations::do_create
192  *
193  * For remote transaction, it will add an OUT_CREATE sub-request into
194  * the OUT RPC that will be flushed when the transaction stop.
195  *
196  * It sets necessary flags for created object. In DNE phase I,
197  * remote updates are actually executed during transaction start,
198  * i.e. the object has already been created when calling this method.
199  *
200  * \param[in] env       execution environment
201  * \param[in] dt        object to be created
202  * \param[in] attr      attribute of the created object
203  * \param[in] hint      creation hint
204  * \param[in] dof       creation format information
205  * \param[in] th        the transaction handle
206  *
207  * \retval              only return 0 for now
208  */
209 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
210                          struct lu_attr *attr, struct dt_allocation_hint *hint,
211                          struct dt_object_format *dof, struct thandle *th)
212 {
213         int rc = 0;
214
215         if (is_only_remote_trans(th)) {
216                 rc = __osp_md_declare_object_create(env, dt, attr, hint,
217                                                     dof, th);
218
219                 CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n",
220                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
221         }
222
223         if (rc == 0) {
224                 dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS |
225                                                  (attr->la_mode & S_IFMT);
226                 dt2osp_obj(dt)->opo_non_exist = 0;
227         }
228
229         return rc;
230 }
231
232 /**
233  * Add OUT_REF_DEL sub-request into the OUT RPC.
234  *
235  * \param[in] env       execution environment
236  * \param[in] dt        object to decrease the reference count.
237  * \param[in] th        the transaction handle of refcount decrease.
238  *
239  * \retval              0 if the insertion succeeds.
240  * \retval              negative errno if the insertion fails.
241  */
242 static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
243                             struct thandle *th)
244 {
245         struct dt_update_request        *update;
246         int                             rc;
247
248         update = dt_update_request_find_or_create(th, dt);
249         if (IS_ERR(update)) {
250                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
251                        dt->do_lu.lo_dev->ld_obd->obd_name,
252                       (int)PTR_ERR(update));
253                 return PTR_ERR(update);
254         }
255
256         rc = out_ref_del_pack(env, &update->dur_buf,
257                               lu_object_fid(&dt->do_lu),
258                               update->dur_batchid);
259         return rc;
260 }
261
262 /**
263  * Implementation of dt_object_operations::do_declare_ref_del
264  *
265  * For non-remote transaction, it will add an OUT_REF_DEL sub-request
266  * into the OUT RPC that will be flushed when the transaction start.
267  *
268  * \param[in] env       execution environment
269  * \param[in] dt        object to decrease the reference count.
270  * \param[in] th        the transaction handle of refcount decrease.
271  *
272  * \retval              0 if the insertion succeeds.
273  * \retval              negative errno if the insertion fails.
274  */
275 static int osp_md_declare_ref_del(const struct lu_env *env,
276                                   struct dt_object *dt, struct thandle *th)
277 {
278         int rc = 0;
279
280         if (!is_only_remote_trans(th)) {
281                 rc = __osp_md_ref_del(env, dt, th);
282
283                 CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n",
284                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
285         }
286
287         return rc;
288 }
289
290 /**
291  * Implementation of dt_object_operations::do_ref_del
292  *
293  * For remote transaction, it will add an OUT_REF_DEL sub-request into
294  * the OUT RPC that will be flushed when the transaction stop.
295  *
296  * \param[in] env       execution environment
297  * \param[in] dt        object to decrease the reference count
298  * \param[in] th        the transaction handle
299  *
300  * \retval              only return 0 for now
301  */
302 static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
303                           struct thandle *th)
304 {
305         int rc = 0;
306
307         if (is_only_remote_trans(th)) {
308                 rc = __osp_md_ref_del(env, dt, th);
309
310                 CDEBUG(D_INFO, "ref del "DFID": rc = %d\n",
311                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
312         }
313
314         return rc;
315 }
316
317 /**
318  * Add OUT_REF_ADD sub-request into the OUT RPC.
319  *
320  * \param[in] env       execution environment
321  * \param[in] dt        object on which to increase the reference count.
322  * \param[in] th        the transaction handle.
323  *
324  * \retval              0 if the insertion succeeds.
325  * \retval              negative errno if the insertion fails.
326  */
327 static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
328                             struct thandle *th)
329 {
330         struct dt_update_request        *update;
331         int                             rc;
332
333         update = dt_update_request_find_or_create(th, dt);
334         if (IS_ERR(update)) {
335                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
336                        dt->do_lu.lo_dev->ld_obd->obd_name,
337                        (int)PTR_ERR(update));
338                 return PTR_ERR(update);
339         }
340
341         rc = out_ref_add_pack(env, &update->dur_buf,
342                               lu_object_fid(&dt->do_lu),
343                               update->dur_batchid);
344
345         return rc;
346 }
347
348 /**
349  * Implementation of dt_object_operations::do_declare_ref_del
350  *
351  * For non-remote transaction, it will add an OUT_REF_ADD sub-request
352  * into the OUT RPC that will be flushed when the transaction start.
353  *
354  * \param[in] env       execution environment
355  * \param[in] dt        object on which to increase the reference count.
356  * \param[in] th        the transaction handle.
357  *
358  * \retval              0 if the insertion succeeds.
359  * \retval              negative errno if the insertion fails.
360  */
361 static int osp_md_declare_ref_add(const struct lu_env *env,
362                                   struct dt_object *dt, struct thandle *th)
363 {
364         int rc = 0;
365
366         if (!is_only_remote_trans(th)) {
367                 rc = __osp_md_ref_add(env, dt, th);
368
369                 CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n",
370                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
371         }
372
373         return rc;
374 }
375
376 /**
377  * Implementation of dt_object_operations::do_ref_add
378  *
379  * For remote transaction, it will add an OUT_REF_ADD sub-request into
380  * the OUT RPC that will be flushed when the transaction stop.
381  *
382  * \param[in] env       execution environment
383  * \param[in] dt        object on which to increase the reference count
384  * \param[in] th        the transaction handle
385  *
386  * \retval              only return 0 for now
387  */
388 static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
389                           struct thandle *th)
390 {
391         int rc = 0;
392
393         if (is_only_remote_trans(th)) {
394                 rc = __osp_md_ref_add(env, dt, th);
395
396                 CDEBUG(D_INFO, "ref add "DFID": rc = %d\n",
397                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
398         }
399
400         return rc;
401 }
402
403 /**
404  * Implementation of dt_object_operations::do_ah_init
405  *
406  * Initialize the allocation hint for object creation, which is usually called
407  * before the creation, and these hints (parent and child mode) will be sent to
408  * the remote Object Update Target (OUT) and used in the object create process,
409  * same as OSD object creation.
410  *
411  * \param[in] env       execution environment
412  * \param[in] ah        the hint to be initialized
413  * \param[in] parent    the parent of the object
414  * \param[in] child     the object to be created
415  * \param[in] child_mode the mode of the created object
416  */
417 static void osp_md_ah_init(const struct lu_env *env,
418                            struct dt_allocation_hint *ah,
419                            struct dt_object *parent,
420                            struct dt_object *child,
421                            umode_t child_mode)
422 {
423         LASSERT(ah);
424
425         ah->dah_parent = parent;
426         ah->dah_mode = child_mode;
427 }
428
429 /**
430  * Add OUT_ATTR_SET sub-request into the OUT RPC.
431  *
432  * \param[in] env       execution environment
433  * \param[in] dt        object on which to set attributes
434  * \param[in] attr      attributes to be set
435  * \param[in] th        the transaction handle
436  *
437  * \retval              0 if the insertion succeeds.
438  * \retval              negative errno if the insertion fails.
439  */
440 int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
441                       const struct lu_attr *attr, struct thandle *th)
442 {
443         struct dt_update_request        *update;
444         int                             rc;
445
446         update = dt_update_request_find_or_create(th, dt);
447         if (IS_ERR(update)) {
448                 CERROR("%s: Get OSP update buf failed: %d\n",
449                        dt->do_lu.lo_dev->ld_obd->obd_name,
450                        (int)PTR_ERR(update));
451                 return PTR_ERR(update);
452         }
453
454         rc = out_attr_set_pack(env, &update->dur_buf,
455                                lu_object_fid(&dt->do_lu), attr,
456                                update->dur_batchid);
457
458         return rc;
459 }
460
461 /**
462  * Implementation of dt_object_operations::do_declare_attr_get
463  *
464  * Declare setting attributes to the specified remote object.
465  *
466  * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET
467  * sub-request into the OUT RPC that will be flushed when the transaction start.
468  *
469  * \param[in] env       execution environment
470  * \param[in] dt        object on which to set attributes
471  * \param[in] attr      attributes to be set
472  * \param[in] th        the transaction handle
473  *
474  * \retval              0 if the insertion succeeds.
475  * \retval              negative errno if the insertion fails.
476  */
477 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
478                             const struct lu_attr *attr, struct thandle *th)
479 {
480         int rc = 0;
481
482         if (!is_only_remote_trans(th)) {
483                 rc = __osp_md_attr_set(env, dt, attr, th);
484
485                 CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n",
486                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
487         }
488
489         return rc;
490 }
491
492 /**
493  * Implementation of dt_object_operations::do_attr_set
494  *
495  * Set attributes to the specified remote object.
496  *
497  * If the transaction is a remote transaction, then add the OUT_ATTR_SET
498  * sub-request into the OUT RPC that will be flushed when the transaction stop.
499  *
500  * \param[in] env       execution environment
501  * \param[in] dt        object to set attributes
502  * \param[in] attr      attributes to be set
503  * \param[in] th        the transaction handle
504  *
505  * \retval              only return 0 for now
506  */
507 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
508                     const struct lu_attr *attr, struct thandle *th)
509 {
510         int rc = 0;
511
512         if (is_only_remote_trans(th)) {
513                 rc = __osp_md_attr_set(env, dt, attr, th);
514
515                 CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n",
516                        PFID(&dt->do_lu.lo_header->loh_fid), rc);
517         }
518
519         return rc;
520 }
521
522 /**
523  * Implementation of dt_object_operations::do_read_lock
524  *
525  * osp_md_object_{read,write}_lock() will only lock the remote object in the
526  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
527  * lock the object. Note: it will not lock the object in the whole cluster,
528  * which relies on the LDLM lock.
529  *
530  * \param[in] env       execution environment
531  * \param[in] dt        object to be locked
532  * \param[in] role      lock role from MDD layer, see mdd_object_role().
533  */
534 static void osp_md_object_read_lock(const struct lu_env *env,
535                                     struct dt_object *dt, unsigned role)
536 {
537         struct osp_object  *obj = dt2osp_obj(dt);
538
539         LASSERT(obj->opo_owner != env);
540         down_read_nested(&obj->opo_sem, role);
541
542         LASSERT(obj->opo_owner == NULL);
543 }
544
545 /**
546  * Implementation of dt_object_operations::do_write_lock
547  *
548  * Lock the remote object in write mode.
549  *
550  * \param[in] env       execution environment
551  * \param[in] dt        object to be locked
552  * \param[in] role      lock role from MDD layer, see mdd_object_role().
553  */
554 static void osp_md_object_write_lock(const struct lu_env *env,
555                                      struct dt_object *dt, unsigned role)
556 {
557         struct osp_object *obj = dt2osp_obj(dt);
558
559         down_write_nested(&obj->opo_sem, role);
560
561         LASSERT(obj->opo_owner == NULL);
562         obj->opo_owner = env;
563 }
564
565 /**
566  * Implementation of dt_object_operations::do_read_unlock
567  *
568  * Unlock the read lock of remote object.
569  *
570  * \param[in] env       execution environment
571  * \param[in] dt        object to be unlocked
572  */
573 static void osp_md_object_read_unlock(const struct lu_env *env,
574                                       struct dt_object *dt)
575 {
576         struct osp_object *obj = dt2osp_obj(dt);
577
578         up_read(&obj->opo_sem);
579 }
580
581 /**
582  * Implementation of dt_object_operations::do_write_unlock
583  *
584  * Unlock the write lock of remote object.
585  *
586  * \param[in] env       execution environment
587  * \param[in] dt        object to be unlocked
588  */
589 static void osp_md_object_write_unlock(const struct lu_env *env,
590                                        struct dt_object *dt)
591 {
592         struct osp_object *obj = dt2osp_obj(dt);
593
594         LASSERT(obj->opo_owner == env);
595         obj->opo_owner = NULL;
596         up_write(&obj->opo_sem);
597 }
598
599 /**
600  * Implementation of dt_object_operations::do_write_locked
601  *
602  * Test if the object is locked in write mode.
603  *
604  * \param[in] env       execution environment
605  * \param[in] dt        object to be tested
606  */
607 static int osp_md_object_write_locked(const struct lu_env *env,
608                                       struct dt_object *dt)
609 {
610         struct osp_object *obj = dt2osp_obj(dt);
611
612         return obj->opo_owner == env;
613 }
614
615 /**
616  * Implementation of dt_index_operations::dio_lookup
617  *
618  * Look up record by key under a remote index object. It packs lookup update
619  * into RPC, sends to the remote OUT and waits for the lookup result.
620  *
621  * \param[in] env       execution environment
622  * \param[in] dt        index object to lookup
623  * \param[out] rec      record in which to return lookup result
624  * \param[in] key       key of index which will be looked up
625  *
626  * \retval              1 if the lookup succeeds.
627  * \retval              negative errno if the lookup fails.
628  */
629 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
630                                struct dt_rec *rec, const struct dt_key *key)
631 {
632         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
633         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
634         struct dt_device        *dt_dev = &osp->opd_dt_dev;
635         struct dt_update_request   *update;
636         struct object_update_reply *reply;
637         struct ptlrpc_request      *req = NULL;
638         struct lu_fid              *fid;
639         int                        rc;
640         ENTRY;
641
642         /* Because it needs send the update buffer right away,
643          * just create an update buffer, instead of attaching the
644          * update_remote list of the thandle.
645          */
646         update = dt_update_request_create(dt_dev);
647         if (IS_ERR(update))
648                 RETURN(PTR_ERR(update));
649
650         rc = out_index_lookup_pack(env, &update->dur_buf,
651                                    lu_object_fid(&dt->do_lu), rec, key);
652         if (rc != 0) {
653                 CERROR("%s: Insert update error: rc = %d\n",
654                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
655                 GOTO(out, rc);
656         }
657
658         rc = osp_remote_sync(env, osp, update, &req, false);
659         if (rc < 0)
660                 GOTO(out, rc);
661
662         reply = req_capsule_server_sized_get(&req->rq_pill,
663                                              &RMF_OUT_UPDATE_REPLY,
664                                              OUT_UPDATE_REPLY_SIZE);
665         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
666                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
667                        dt_dev->dd_lu_dev.ld_obd->obd_name,
668                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
669                 GOTO(out, rc = -EPROTO);
670         }
671
672         rc = object_update_result_data_get(reply, lbuf, 0);
673         if (rc < 0)
674                 GOTO(out, rc);
675
676         if (lbuf->lb_len != sizeof(*fid)) {
677                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
678                        dt_dev->dd_lu_dev.ld_obd->obd_name,
679                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
680                        (int)lbuf->lb_len);
681                 GOTO(out, rc = -EINVAL);
682         }
683
684         fid = lbuf->lb_buf;
685         if (ptlrpc_rep_need_swab(req))
686                 lustre_swab_lu_fid(fid);
687         if (!fid_is_sane(fid)) {
688                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
689                        dt_dev->dd_lu_dev.ld_obd->obd_name,
690                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
691                 GOTO(out, rc = -EINVAL);
692         }
693
694         memcpy(rec, fid, sizeof(*fid));
695
696         GOTO(out, rc = 1);
697
698 out:
699         if (req != NULL)
700                 ptlrpc_req_finished(req);
701
702         dt_update_request_destroy(update);
703
704         return rc;
705 }
706
707 /**
708  * Add OUT_INDEX_INSERT sub-request into the OUT RPC.
709  *
710  * \param[in] env       execution environment
711  * \param[in] dt        object for which to insert index
712  * \param[in] rec       record of the index which will be inserted
713  * \param[in] key       key of the index which will be inserted
714  * \param[in] th        the transaction handle
715  *
716  * \retval              0 if the insertion succeeds.
717  * \retval              negative errno if the insertion fails.
718  */
719 static int __osp_md_index_insert(const struct lu_env *env,
720                                  struct dt_object *dt,
721                                  const struct dt_rec *rec,
722                                  const struct dt_key *key,
723                                  struct thandle *th)
724 {
725         struct dt_update_request *update;
726         int                      rc;
727
728         update = dt_update_request_find_or_create(th, dt);
729         if (IS_ERR(update)) {
730                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
731                        dt->do_lu.lo_dev->ld_obd->obd_name,
732                        (int)PTR_ERR(update));
733                 return PTR_ERR(update);
734         }
735
736         rc = out_index_insert_pack(env, &update->dur_buf,
737                                    lu_object_fid(&dt->do_lu), rec, key,
738                                    update->dur_batchid);
739         return rc;
740 }
741
742 /**
743  * Implementation of dt_index_operations::dio_declare_insert
744  *
745  * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request
746  * into the OUT RPC that will be flushed when the transaction start.
747  *
748  * \param[in] env       execution environment
749  * \param[in] dt        object for which to insert index
750  * \param[in] rec       record of the index which will be inserted
751  * \param[in] key       key of the index which will be inserted
752  * \param[in] th        the transaction handle
753  *
754  * \retval              0 if the insertion succeeds.
755  * \retval              negative errno if the insertion fails.
756  */
757 static int osp_md_declare_index_insert(const struct lu_env *env,
758                                        struct dt_object *dt,
759                                        const struct dt_rec *rec,
760                                        const struct dt_key *key,
761                                        struct thandle *th)
762 {
763         int rc = 0;
764
765         if (!is_only_remote_trans(th)) {
766                 rc = __osp_md_index_insert(env, dt, rec, key, th);
767
768                 CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID
769                        ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
770                        (char *)key,
771                        PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
772         }
773
774         return rc;
775 }
776
777 /**
778  * Implementation of dt_index_operations::dio_insert
779  *
780  * For remote transaction, it will add an OUT_INDEX_INSERT sub-request
781  * into the OUT RPC that will be flushed when the transaction stop.
782  *
783  * \param[in] env       execution environment
784  * \param[in] dt        object for which to insert index
785  * \param[in] rec       record of the index to be inserted
786  * \param[in] key       key of the index to be inserted
787  * \param[in] th        the transaction handle
788  * \param[in] ignore_quota quota enforcement for insert
789  *
790  * \retval              only return 0 for now
791  */
792 static int osp_md_index_insert(const struct lu_env *env,
793                                struct dt_object *dt,
794                                const struct dt_rec *rec,
795                                const struct dt_key *key,
796                                struct thandle *th,
797                                int ignore_quota)
798 {
799         int rc = 0;
800
801         if (is_only_remote_trans(th)) {
802                 rc = __osp_md_index_insert(env, dt, rec, key, th);
803
804                 CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID
805                        ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
806                        (char *)key,
807                        PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
808         }
809
810         return rc;
811 }
812
813 /**
814  * Add OUT_INDEX_DELETE sub-request into the OUT RPC.
815  *
816  * \param[in] env       execution environment
817  * \param[in] dt        object for which to delete index
818  * \param[in] key       key of the index
819  * \param[in] th        the transaction handle
820  *
821  * \retval              0 if the insertion succeeds.
822  * \retval              negative errno if the insertion fails.
823  */
824 static int __osp_md_index_delete(const struct lu_env *env,
825                                  struct dt_object *dt,
826                                  const struct dt_key *key,
827                                  struct thandle *th)
828 {
829         struct dt_update_request *update;
830         int                      rc;
831
832         update = dt_update_request_find_or_create(th, dt);
833         if (IS_ERR(update)) {
834                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
835                        dt->do_lu.lo_dev->ld_obd->obd_name,
836                        (int)PTR_ERR(update));
837                 return PTR_ERR(update);
838         }
839
840         rc = out_index_delete_pack(env, &update->dur_buf,
841                                    lu_object_fid(&dt->do_lu), key,
842                                    update->dur_batchid);
843         return rc;
844 }
845
846 /**
847  * Implementation of dt_index_operations::dio_declare_delete
848  *
849  * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request
850  * into the OUT RPC that will be flushed when the transaction start.
851  *
852  * \param[in] env       execution environment
853  * \param[in] dt        object for which to delete index
854  * \param[in] key       key of the index
855  * \param[in] th        the transaction handle
856  *
857  * \retval              0 if the insertion succeeds.
858  * \retval              negative errno if the insertion fails.
859  */
860 static int osp_md_declare_index_delete(const struct lu_env *env,
861                                        struct dt_object *dt,
862                                        const struct dt_key *key,
863                                        struct thandle *th)
864 {
865         int rc = 0;
866
867         if (!is_only_remote_trans(th)) {
868                 rc = __osp_md_index_delete(env, dt, key, th);
869
870                 CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n",
871                        PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
872         }
873
874         return rc;
875 }
876
877 /**
878  * Implementation of dt_index_operations::dio_delete
879  *
880  * For remote transaction, it will add an OUT_INDEX_DELETE sub-request
881  * into the OUT RPC that will be flushed when the transaction stop.
882  *
883  * \param[in] env       execution environment
884  * \param[in] dt        object for which to delete index
885  * \param[in] key       key of the index which will be deleted
886  * \param[in] th        the transaction handle
887  *
888  * \retval              only return 0 for now
889  */
890 static int osp_md_index_delete(const struct lu_env *env,
891                                struct dt_object *dt,
892                                const struct dt_key *key,
893                                struct thandle *th)
894 {
895         int rc = 0;
896
897         if (is_only_remote_trans(th)) {
898                 rc = __osp_md_index_delete(env, dt, key, th);
899
900                 CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n",
901                        PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
902         }
903
904         return rc;
905 }
906
907 /**
908  * Implementation of dt_index_operations::dio_it.next
909  *
910  * Advance the pointer of the iterator to the next entry. It shares a similar
911  * internal implementation with osp_orphan_it_next(), which is being used for
912  * remote orphan index object. This method will be used for remote directory.
913  *
914  * \param[in] env       execution environment
915  * \param[in] di        iterator of this iteration
916  *
917  * \retval              0 if the pointer is advanced successfuly.
918  * \retval              1 if it reaches to the end of the index object.
919  * \retval              negative errno if the pointer cannot be advanced.
920  */
921 static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
922 {
923         struct osp_it           *it = (struct osp_it *)di;
924         struct lu_idxpage       *idxpage;
925         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
926         int                     rc;
927         ENTRY;
928
929 again:
930         idxpage = it->ooi_cur_idxpage;
931         if (idxpage != NULL) {
932                 if (idxpage->lip_nr == 0)
933                         RETURN(1);
934
935                 it->ooi_pos_ent++;
936                 if (ent == NULL) {
937                         it->ooi_ent =
938                               (struct lu_dirent *)idxpage->lip_entries;
939                         RETURN(0);
940                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
941                            it->ooi_pos_ent < idxpage->lip_nr) {
942                         ent = (struct lu_dirent *)(((char *)ent) +
943                                         le16_to_cpu(ent->lde_reclen));
944                         it->ooi_ent = ent;
945                         RETURN(0);
946                 } else {
947                         it->ooi_ent = NULL;
948                 }
949         }
950
951         rc = osp_it_next_page(env, di);
952         if (rc == 0)
953                 goto again;
954
955         RETURN(rc);
956 }
957
958 /**
959  * Implementation of dt_index_operations::dio_it.key
960  *
961  * Get the key at current iterator poisiton. These iteration methods
962  * (dio_it) will only be used for iterating the remote directory, so
963  * the key is the name of the directory entry.
964  *
965  * \param[in] env       execution environment
966  * \param[in] di        iterator of this iteration
967  *
968  * \retval              name of the current entry
969  */
970 static struct dt_key *osp_it_key(const struct lu_env *env,
971                                  const struct dt_it *di)
972 {
973         struct osp_it           *it = (struct osp_it *)di;
974         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
975
976         return (struct dt_key *)ent->lde_name;
977 }
978
979 /**
980  * Implementation of dt_index_operations::dio_it.key_size
981  *
982  * Get the key size at current iterator poisiton. These iteration methods
983  * (dio_it) will only be used for iterating the remote directory, so the key
984  * size is the name size of the directory entry.
985  *
986  * \param[in] env       execution environment
987  * \param[in] di        iterator of this iteration
988  *
989  * \retval              name size of the current entry
990  */
991
992 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
993 {
994         struct osp_it           *it = (struct osp_it *)di;
995         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
996
997         return (int)le16_to_cpu(ent->lde_namelen);
998 }
999
1000 /**
1001  * Implementation of dt_index_operations::dio_it.rec
1002  *
1003  * Get the record at current iterator position. These iteration methods
1004  * (dio_it) will only be used for iterating the remote directory, so it
1005  * uses lu_dirent_calc_size() to calculate the record size.
1006  *
1007  * \param[in] env       execution environment
1008  * \param[in] di        iterator of this iteration
1009  * \param[out] rec      the record to be returned
1010  * \param[in] attr      attributes of the index object, so it knows
1011  *                      how to pack the entry.
1012  *
1013  * \retval              only return 0 for now
1014  */
1015 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1016                                struct dt_rec *rec, __u32 attr)
1017 {
1018         struct osp_it           *it = (struct osp_it *)di;
1019         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
1020         size_t                  reclen;
1021
1022         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
1023         memcpy(rec, ent, reclen);
1024         return 0;
1025 }
1026
1027 /**
1028  * Implementation of dt_index_operations::dio_it.load
1029  *
1030  * Locate the iteration cursor to the specified position (cookie).
1031  *
1032  * \param[in] env       pointer to the thread context
1033  * \param[in] di        pointer to the iteration structure
1034  * \param[in] hash      the specified position
1035  *
1036  * \retval              positive number for locating to the exactly position
1037  *                      or the next
1038  * \retval              0 for arriving at the end of the iteration
1039  * \retval              negative error number on failure
1040  */
1041 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
1042                        __u64 hash)
1043 {
1044         struct osp_it   *it     = (struct osp_it *)di;
1045         int              rc;
1046
1047         it->ooi_next = hash;
1048         rc = osp_md_index_it_next(env, (struct dt_it *)di);
1049         if (rc == 1)
1050                 return 0;
1051
1052         if (rc == 0)
1053                 return 1;
1054
1055         return rc;
1056 }
1057
1058 const struct dt_index_operations osp_md_index_ops = {
1059         .dio_lookup         = osp_md_index_lookup,
1060         .dio_declare_insert = osp_md_declare_index_insert,
1061         .dio_insert         = osp_md_index_insert,
1062         .dio_declare_delete = osp_md_declare_index_delete,
1063         .dio_delete         = osp_md_index_delete,
1064         .dio_it     = {
1065                 .init     = osp_it_init,
1066                 .fini     = osp_it_fini,
1067                 .get      = osp_it_get,
1068                 .put      = osp_it_put,
1069                 .next     = osp_md_index_it_next,
1070                 .key      = osp_it_key,
1071                 .key_size = osp_it_key_size,
1072                 .rec      = osp_md_index_it_rec,
1073                 .store    = osp_it_store,
1074                 .load     = osp_it_load,
1075                 .key_rec  = osp_it_key_rec,
1076         }
1077 };
1078
1079 /**
1080  * Implementation of dt_object_operations::do_index_try
1081  *
1082  * Try to initialize the index API pointer for the given object. This
1083  * is the entry point of the index API, i.e. we must call this method
1084  * to initialize the index object before calling other index methods.
1085  *
1086  * \param[in] env       execution environment
1087  * \param[in] dt        index object to be initialized
1088  * \param[in] feat      the index feature of the object
1089  *
1090  * \retval              0 if the initialization succeeds.
1091  * \retval              negative errno if the initialization fails.
1092  */
1093 static int osp_md_index_try(const struct lu_env *env,
1094                             struct dt_object *dt,
1095                             const struct dt_index_features *feat)
1096 {
1097         dt->do_index_ops = &osp_md_index_ops;
1098         return 0;
1099 }
1100
1101 /**
1102  * Implementation of dt_object_operations::do_object_lock
1103  *
1104  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
1105  * which will lock the object in the global namespace.
1106  *
1107  * \param[in] env       execution environment
1108  * \param[in] dt        object to be locked
1109  * \param[out] lh       lock handle
1110  * \param[in] einfo     enqueue information
1111  * \param[in] policy    lock policy
1112  *
1113  * \retval              ELDLM_OK if locking the object succeeds.
1114  * \retval              negative errno if locking fails.
1115  */
1116 static int osp_md_object_lock(const struct lu_env *env,
1117                               struct dt_object *dt,
1118                               struct lustre_handle *lh,
1119                               struct ldlm_enqueue_info *einfo,
1120                               ldlm_policy_data_t *policy)
1121 {
1122         struct ldlm_res_id      *res_id;
1123         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1124         struct osp_device       *osp = dt2osp_dev(dt_dev);
1125         struct ptlrpc_request   *req;
1126         int                     rc = 0;
1127         __u64                   flags = 0;
1128         ldlm_mode_t             mode;
1129
1130         res_id = einfo->ei_res_id;
1131         LASSERT(res_id != NULL);
1132
1133         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
1134                                LDLM_FL_BLOCK_GRANTED, res_id,
1135                                einfo->ei_type, policy,
1136                                einfo->ei_mode, lh, 0);
1137         if (mode > 0)
1138                 return ELDLM_OK;
1139
1140         req = ldlm_enqueue_pack(osp->opd_exp, 0);
1141         if (IS_ERR(req))
1142                 RETURN(PTR_ERR(req));
1143
1144         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
1145                               (const ldlm_policy_data_t *)policy,
1146                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
1147
1148         ptlrpc_req_finished(req);
1149
1150         return rc == ELDLM_OK ? 0 : -EIO;
1151 }
1152
1153 /**
1154  * Implementation of dt_object_operations::do_object_unlock
1155  *
1156  * Cancel a lock of a remote object.
1157  *
1158  * \param[in] env       execution environment
1159  * \param[in] dt        object to be unlocked
1160  * \param[in] einfo     lock enqueue information
1161  * \param[in] policy    lock policy
1162  *
1163  * \retval              Only return 0 for now.
1164  */
1165 static int osp_md_object_unlock(const struct lu_env *env,
1166                                 struct dt_object *dt,
1167                                 struct ldlm_enqueue_info *einfo,
1168                                 ldlm_policy_data_t *policy)
1169 {
1170         struct lustre_handle    *lockh = einfo->ei_cbdata;
1171
1172         /* unlock finally */
1173         ldlm_lock_decref(lockh, einfo->ei_mode);
1174
1175         return 0;
1176 }
1177
1178 struct dt_object_operations osp_md_obj_ops = {
1179         .do_read_lock         = osp_md_object_read_lock,
1180         .do_write_lock        = osp_md_object_write_lock,
1181         .do_read_unlock       = osp_md_object_read_unlock,
1182         .do_write_unlock      = osp_md_object_write_unlock,
1183         .do_write_locked      = osp_md_object_write_locked,
1184         .do_declare_create    = osp_md_declare_object_create,
1185         .do_create            = osp_md_object_create,
1186         .do_declare_ref_add   = osp_md_declare_ref_add,
1187         .do_ref_add           = osp_md_ref_add,
1188         .do_declare_ref_del   = osp_md_declare_ref_del,
1189         .do_ref_del           = osp_md_ref_del,
1190         .do_declare_destroy   = osp_declare_object_destroy,
1191         .do_destroy           = osp_object_destroy,
1192         .do_ah_init           = osp_md_ah_init,
1193         .do_attr_get          = osp_attr_get,
1194         .do_declare_attr_set  = osp_md_declare_attr_set,
1195         .do_attr_set          = osp_md_attr_set,
1196         .do_xattr_get         = osp_xattr_get,
1197         .do_declare_xattr_set = osp_declare_xattr_set,
1198         .do_xattr_set         = osp_xattr_set,
1199         .do_declare_xattr_del = osp_declare_xattr_del,
1200         .do_xattr_del         = osp_xattr_del,
1201         .do_index_try         = osp_md_index_try,
1202         .do_object_lock       = osp_md_object_lock,
1203         .do_object_unlock     = osp_md_object_unlock,
1204 };
1205
1206 /**
1207  * Implementation of dt_body_operations::dbo_declare_write
1208  *
1209  * Declare an object write. In DNE phase I, it will pack the write
1210  * object update into the RPC.
1211  *
1212  * \param[in] env       execution environment
1213  * \param[in] dt        object to be written
1214  * \param[in] buf       buffer to write which includes an embedded size field
1215  * \param[in] pos       offet in the object to start writing at
1216  * \param[in] th        transaction handle
1217  *
1218  * \retval              0 if the insertion succeeds.
1219  * \retval              negative errno if the insertion fails.
1220  */
1221 static ssize_t osp_md_declare_write(const struct lu_env *env,
1222                                     struct dt_object *dt,
1223                                     const struct lu_buf *buf,
1224                                     loff_t pos, struct thandle *th)
1225 {
1226         struct dt_update_request  *update;
1227         ssize_t                   rc;
1228
1229         update = dt_update_request_find_or_create(th, dt);
1230         if (IS_ERR(update)) {
1231                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
1232                        dt->do_lu.lo_dev->ld_obd->obd_name,
1233                        (int)PTR_ERR(update));
1234                 return PTR_ERR(update);
1235         }
1236
1237         rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
1238                             buf, pos, update->dur_batchid);
1239
1240         return rc;
1241
1242 }
1243
1244 /**
1245  * Implementation of dt_body_operations::dbo_write
1246  *
1247  * Return the buffer size. In DNE phase I, remote updates
1248  * are actually executed during transaction start, the buffer has
1249  * already been written when this method is being called.
1250  *
1251  * \param[in] env       execution environment
1252  * \param[in] dt        object to be written
1253  * \param[in] buf       buffer to write which includes an embedded size field
1254  * \param[in] pos       offet in the object to start writing at
1255  * \param[in] th        transaction handle
1256  * \param[in] ignore_quota quota enforcement for this write
1257  *
1258  * \retval              the buffer size in bytes.
1259  */
1260 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1261                             const struct lu_buf *buf, loff_t *pos,
1262                             struct thandle *handle, int ignore_quota)
1263 {
1264         *pos += buf->lb_len;
1265         return buf->lb_len;
1266 }
1267
1268 /* These body operation will be used to write symlinks during migration etc */
1269 struct dt_body_operations osp_md_body_ops = {
1270         .dbo_declare_write      = osp_md_declare_write,
1271         .dbo_write              = osp_md_write,
1272 };