Whamcloud - gitweb
c32b92ff27ece61045d07f48e2aa6a61f0bd303b
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Implementation of dt_object_operations::do_declare_create
64  *
65  * Insert object create update into the RPC, which will be sent during
66  * transaction start. Note: if the object has already been created,
67  * we must add object destroy updates ahead of create updates, so it will
68  * destroy then recreate the object.
69  *
70  * \param[in] env       execution environment
71  * \param[in] dt        remote object to be created
72  * \param[in] attr      attribute of the created object
73  * \param[in] hint      creation hint
74  * \param[in] dof       creation format information
75  * \param[in] th        the transaction handle
76  *
77  * \retval              0 if the insertion succeeds.
78  * \retval              negative errno if the insertion fails.
79  */
80 int osp_md_declare_object_create(const struct lu_env *env,
81                                  struct dt_object *dt,
82                                  struct lu_attr *attr,
83                                  struct dt_allocation_hint *hint,
84                                  struct dt_object_format *dof,
85                                  struct thandle *th)
86 {
87         struct dt_update_request        *update;
88         int                             rc;
89
90         update = dt_update_request_find_or_create(th, dt);
91         if (IS_ERR(update)) {
92                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
93                        dt->do_lu.lo_dev->ld_obd->obd_name,
94                        (int)PTR_ERR(update));
95                 return PTR_ERR(update);
96         }
97
98         if (lu_object_exists(&dt->do_lu)) {
99                 /* If the object already exists, we needs to destroy
100                  * this orphan object first.
101                  *
102                  * The scenario might happen in this case
103                  *
104                  * 1. client send remote create to MDT0.
105                  * 2. MDT0 send create update to MDT1.
106                  * 3. MDT1 finished create synchronously.
107                  * 4. MDT0 failed and reboot.
108                  * 5. client resend remote create to MDT0.
109                  * 6. MDT0 tries to resend create update to MDT1,
110                  *    but find the object already exists
111                  */
112                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
113                        dt->do_lu.lo_dev->ld_obd->obd_name,
114                        PFID(lu_object_fid(&dt->do_lu)));
115
116                 rc = out_ref_del_pack(env, &update->dur_buf,
117                                       lu_object_fid(&dt->do_lu),
118                                       update->dur_batchid);
119                 if (rc != 0)
120                         GOTO(out, rc);
121
122                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
123                         /* decrease for ".." */
124                         rc = out_ref_del_pack(env, &update->dur_buf,
125                                               lu_object_fid(&dt->do_lu),
126                                               update->dur_batchid);
127                         if (rc != 0)
128                                 GOTO(out, rc);
129                 }
130
131                 rc = out_object_destroy_pack(env, &update->dur_buf,
132                                              lu_object_fid(&dt->do_lu),
133                                              update->dur_batchid);
134                 if (rc != 0)
135                         GOTO(out, rc);
136
137                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
138                 /* Increase batchid to add this orphan object deletion
139                  * to separate transaction */
140                 update_inc_batchid(update);
141         }
142
143         rc = out_create_pack(env, &update->dur_buf,
144                              lu_object_fid(&dt->do_lu), attr, hint, dof,
145                              update->dur_batchid);
146         if (rc != 0)
147                 GOTO(out, rc);
148 out:
149         if (rc)
150                 CERROR("%s: Insert update error: rc = %d\n",
151                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
152
153         return rc;
154 }
155
156 /**
157  * Implementation of dt_object_operations::do_create
158  *
159  * It sets necessary flags for created object. In DNE phase I,
160  * remote updates are actually executed during transaction start,
161  * i.e. the object has already been created when calling this method.
162  *
163  * \param[in] env       execution environment
164  * \param[in] dt        object to be created
165  * \param[in] attr      attribute of the created object
166  * \param[in] hint      creation hint
167  * \param[in] dof       creation format information
168  * \param[in] th        the transaction handle
169  *
170  * \retval              only return 0 for now
171  */
172 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
173                          struct lu_attr *attr, struct dt_allocation_hint *hint,
174                          struct dt_object_format *dof, struct thandle *th)
175 {
176         CDEBUG(D_INFO, "create object "DFID"\n",
177                PFID(&dt->do_lu.lo_header->loh_fid));
178
179         /* Because the create update RPC will be sent during declare phase,
180          * if creation reaches here, it means the object has been created
181          * successfully */
182         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
183         dt2osp_obj(dt)->opo_non_exist = 0;
184
185         return 0;
186 }
187
188 /**
189  * Implementation of dt_object_operations::do_declare_ref_del
190  *
191  * Declare decreasing the reference count of the remote object, i.e. insert
192  * decreasing object reference count update into the RPC, which will be sent
193  * during transaction start.
194  *
195  * \param[in] env       execution environment
196  * \param[in] dt        object to decrease the reference count.
197  * \param[in] th        the transaction handle of refcount decrease.
198  *
199  * \retval              0 if the insertion succeeds.
200  * \retval              negative errno if the insertion fails.
201  */
202 static int osp_md_declare_object_ref_del(const struct lu_env *env,
203                                          struct dt_object *dt,
204                                          struct thandle *th)
205 {
206         struct dt_update_request        *update;
207         int                             rc;
208
209         update = dt_update_request_find_or_create(th, dt);
210         if (IS_ERR(update)) {
211                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
212                        dt->do_lu.lo_dev->ld_obd->obd_name,
213                       (int)PTR_ERR(update));
214                 return PTR_ERR(update);
215         }
216
217         rc = out_ref_del_pack(env, &update->dur_buf,
218                               lu_object_fid(&dt->do_lu),
219                               update->dur_batchid);
220         return rc;
221 }
222
223 /**
224  * Implementation of dt_object_operations::do_ref_del
225  *
226  * Do nothing in this method for now. In DNE phase I, remote updates are
227  * actually executed during transaction start, i.e. the object reference
228  * count has already been decreased when calling this method.
229  *
230  * \param[in] env       execution environment
231  * \param[in] dt        object to decrease the reference count
232  * \param[in] th        the transaction handle
233  *
234  * \retval              only return 0 for now
235  */
236 static int osp_md_object_ref_del(const struct lu_env *env,
237                                  struct dt_object *dt,
238                                  struct thandle *th)
239 {
240         CDEBUG(D_INFO, "ref del object "DFID"\n",
241                PFID(&dt->do_lu.lo_header->loh_fid));
242
243         return 0;
244 }
245
246 /**
247  * Implementation of dt_object_operations::do_declare_ref_del
248  *
249  * Declare increasing the reference count of the remote object,
250  * i.e. insert increasing object reference count update into RPC.
251  *
252  * \param[in] env       execution environment
253  * \param[in] dt        object on which to increase the reference count.
254  * \param[in] th        the transaction handle.
255  *
256  * \retval              0 if the insertion succeeds.
257  * \retval              negative errno if the insertion fails.
258  */
259 static int osp_md_declare_ref_add(const struct lu_env *env,
260                                   struct dt_object *dt, struct thandle *th)
261 {
262         struct dt_update_request        *update;
263         int                             rc;
264
265         update = dt_update_request_find_or_create(th, dt);
266         if (IS_ERR(update)) {
267                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
268                        dt->do_lu.lo_dev->ld_obd->obd_name,
269                        (int)PTR_ERR(update));
270                 return PTR_ERR(update);
271         }
272
273         rc = out_ref_add_pack(env, &update->dur_buf,
274                               lu_object_fid(&dt->do_lu),
275                               update->dur_batchid);
276
277         return rc;
278 }
279
280 /**
281  * Implementation of dt_object_operations::do_ref_add
282  *
283  * Do nothing in this method for now. In DNE phase I, remote updates are
284  * actually executed during transaction start, i.e. the object reference
285  * count has already been increased when calling this method.
286  *
287  * \param[in] env       execution environment
288  * \param[in] dt        object on which to increase the reference count
289  * \param[in] th        the transaction handle
290  *
291  * \retval              only return 0 for now
292  */
293 static int osp_md_object_ref_add(const struct lu_env *env, struct dt_object *dt,
294                                  struct thandle *th)
295 {
296         CDEBUG(D_INFO, "ref add object "DFID"\n",
297                PFID(&dt->do_lu.lo_header->loh_fid));
298
299         return 0;
300 }
301
302 /**
303  * Implementation of dt_object_operations::do_ah_init
304  *
305  * Initialize the allocation hint for object creation, which is usually called
306  * before the creation, and these hints (parent and child mode) will be sent to
307  * the remote Object Update Target (OUT) and used in the object create process,
308  * same as OSD object creation.
309  *
310  * \param[in] env       execution environment
311  * \param[in] ah        the hint to be initialized
312  * \param[in] parent    the parent of the object
313  * \param[in] child     the object to be created
314  * \param[in] child_mode the mode of the created object
315  */
316 static void osp_md_ah_init(const struct lu_env *env,
317                            struct dt_allocation_hint *ah,
318                            struct dt_object *parent,
319                            struct dt_object *child,
320                            umode_t child_mode)
321 {
322         LASSERT(ah);
323
324         ah->dah_parent = parent;
325         ah->dah_mode = child_mode;
326 }
327
328 /**
329  * Add attr_set sub-request into the OUT RPC.
330  *
331  * \param[in] env       execution environment
332  * \param[in] dt        object on which to set attributes
333  * \param[in] attr      attributes to be set
334  * \param[in] th        the transaction handle
335  *
336  * \retval              0 if the insertion succeeds.
337  * \retval              negative errno if the insertion fails.
338  */
339 int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
340                       const struct lu_attr *attr, struct thandle *th)
341 {
342         struct dt_update_request        *update;
343         int                             rc;
344
345         update = dt_update_request_find_or_create(th, dt);
346         if (IS_ERR(update)) {
347                 CERROR("%s: Get OSP update buf failed: %d\n",
348                        dt->do_lu.lo_dev->ld_obd->obd_name,
349                        (int)PTR_ERR(update));
350                 return PTR_ERR(update);
351         }
352
353         rc = out_attr_set_pack(env, &update->dur_buf,
354                                lu_object_fid(&dt->do_lu), attr,
355                                update->dur_batchid);
356
357         return rc;
358 }
359
360 /**
361  * Implementation of dt_object_operations::do_declare_attr_get
362  *
363  * Declare setting attributes to the specified remote object.
364  *
365  * If the transaction is a remote transaction, then add the modification
366  * sub-request into the OUT RPC here, and such OUT RPC will be triggered
367  * when transaction start.
368  *
369  * \param[in] env       execution environment
370  * \param[in] dt        object on which to set attributes
371  * \param[in] attr      attributes to be set
372  * \param[in] th        the transaction handle
373  *
374  * \retval              0 if the insertion succeeds.
375  * \retval              negative errno if the insertion fails.
376  */
377 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
378                             const struct lu_attr *attr, struct thandle *th)
379 {
380         int rc = 0;
381
382         CDEBUG(D_INFO, "declare attr set object "DFID"\n",
383                PFID(&dt->do_lu.lo_header->loh_fid));
384
385         if (!is_only_remote_trans(th))
386                 rc = __osp_md_attr_set(env, dt, attr, th);
387
388         return rc;
389 }
390
391 /**
392  * Implementation of dt_object_operations::do_attr_set
393  *
394  * Set attributes to the specified remote object.
395  *
396  * If the transaction is a remote transaction, then related modification
397  * sub-request has been added in the declare phase and related OUT RPC
398  * has been triggered at transaction start. Otherwise, the modification
399  * sub-request will be added here, and related OUT RPC will be triggered
400  * when transaction stop.
401  *
402  * \param[in] env       execution environment
403  * \param[in] dt        object to set attributes
404  * \param[in] attr      attributes to be set
405  * \param[in] th        the transaction handle
406  * \param[in] capa      capability of setting attributes (not yet implemented).
407  *
408  * \retval              only return 0 for now
409  */
410 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
411                     const struct lu_attr *attr, struct thandle *th,
412                     struct lustre_capa *capa)
413 {
414         int rc = 0;
415
416         CDEBUG(D_INFO, "attr set object "DFID"\n",
417                PFID(&dt->do_lu.lo_header->loh_fid));
418
419         if (is_only_remote_trans(th))
420                 rc = __osp_md_attr_set(env, dt, attr, th);
421
422         RETURN(rc);
423 }
424
425 /**
426  * Implementation of dt_object_operations::do_read_lock
427  *
428  * osp_md_object_{read,write}_lock() will only lock the remote object in the
429  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
430  * lock the object. Note: it will not lock the object in the whole cluster,
431  * which relies on the LDLM lock.
432  *
433  * \param[in] env       execution environment
434  * \param[in] dt        object to be locked
435  * \param[in] role      lock role from MDD layer, see mdd_object_role().
436  */
437 static void osp_md_object_read_lock(const struct lu_env *env,
438                                     struct dt_object *dt, unsigned role)
439 {
440         struct osp_object  *obj = dt2osp_obj(dt);
441
442         LASSERT(obj->opo_owner != env);
443         down_read_nested(&obj->opo_sem, role);
444
445         LASSERT(obj->opo_owner == NULL);
446 }
447
448 /**
449  * Implementation of dt_object_operations::do_write_lock
450  *
451  * Lock the remote object in write mode.
452  *
453  * \param[in] env       execution environment
454  * \param[in] dt        object to be locked
455  * \param[in] role      lock role from MDD layer, see mdd_object_role().
456  */
457 static void osp_md_object_write_lock(const struct lu_env *env,
458                                      struct dt_object *dt, unsigned role)
459 {
460         struct osp_object *obj = dt2osp_obj(dt);
461
462         down_write_nested(&obj->opo_sem, role);
463
464         LASSERT(obj->opo_owner == NULL);
465         obj->opo_owner = env;
466 }
467
468 /**
469  * Implementation of dt_object_operations::do_read_unlock
470  *
471  * Unlock the read lock of remote object.
472  *
473  * \param[in] env       execution environment
474  * \param[in] dt        object to be unlocked
475  */
476 static void osp_md_object_read_unlock(const struct lu_env *env,
477                                       struct dt_object *dt)
478 {
479         struct osp_object *obj = dt2osp_obj(dt);
480
481         up_read(&obj->opo_sem);
482 }
483
484 /**
485  * Implementation of dt_object_operations::do_write_unlock
486  *
487  * Unlock the write lock of remote object.
488  *
489  * \param[in] env       execution environment
490  * \param[in] dt        object to be unlocked
491  */
492 static void osp_md_object_write_unlock(const struct lu_env *env,
493                                        struct dt_object *dt)
494 {
495         struct osp_object *obj = dt2osp_obj(dt);
496
497         LASSERT(obj->opo_owner == env);
498         obj->opo_owner = NULL;
499         up_write(&obj->opo_sem);
500 }
501
502 /**
503  * Implementation of dt_object_operations::do_write_locked
504  *
505  * Test if the object is locked in write mode.
506  *
507  * \param[in] env       execution environment
508  * \param[in] dt        object to be tested
509  */
510 static int osp_md_object_write_locked(const struct lu_env *env,
511                                       struct dt_object *dt)
512 {
513         struct osp_object *obj = dt2osp_obj(dt);
514
515         return obj->opo_owner == env;
516 }
517
518 /**
519  * Implementation of dt_index_operations::dio_lookup
520  *
521  * Look up record by key under a remote index object. It packs lookup update
522  * into RPC, sends to the remote OUT and waits for the lookup result.
523  *
524  * \param[in] env       execution environment
525  * \param[in] dt        index object to lookup
526  * \param[out] rec      record in which to return lookup result
527  * \param[in] key       key of index which will be looked up
528  * \param[in] capa      capability of lookup (not yet implemented)
529  *
530  * \retval              1 if the lookup succeeds.
531  * \retval              negative errno if the lookup fails.
532  */
533 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
534                                struct dt_rec *rec, const struct dt_key *key,
535                                struct lustre_capa *capa)
536 {
537         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
538         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
539         struct dt_device        *dt_dev = &osp->opd_dt_dev;
540         struct dt_update_request   *update;
541         struct object_update_reply *reply;
542         struct ptlrpc_request      *req = NULL;
543         struct lu_fid              *fid;
544         int                        rc;
545         ENTRY;
546
547         /* Because it needs send the update buffer right away,
548          * just create an update buffer, instead of attaching the
549          * update_remote list of the thandle.
550          */
551         update = dt_update_request_create(dt_dev);
552         if (IS_ERR(update))
553                 RETURN(PTR_ERR(update));
554
555         rc = out_index_lookup_pack(env, &update->dur_buf,
556                                    lu_object_fid(&dt->do_lu), rec, key);
557         if (rc != 0) {
558                 CERROR("%s: Insert update error: rc = %d\n",
559                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
560                 GOTO(out, rc);
561         }
562
563         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
564         if (rc < 0)
565                 GOTO(out, rc);
566
567         reply = req_capsule_server_sized_get(&req->rq_pill,
568                                              &RMF_OUT_UPDATE_REPLY,
569                                              OUT_UPDATE_REPLY_SIZE);
570         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
571                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
572                        dt_dev->dd_lu_dev.ld_obd->obd_name,
573                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
574                 GOTO(out, rc = -EPROTO);
575         }
576
577         rc = object_update_result_data_get(reply, lbuf, 0);
578         if (rc < 0)
579                 GOTO(out, rc);
580
581         if (lbuf->lb_len != sizeof(*fid)) {
582                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
583                        dt_dev->dd_lu_dev.ld_obd->obd_name,
584                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
585                        (int)lbuf->lb_len);
586                 GOTO(out, rc = -EINVAL);
587         }
588
589         fid = lbuf->lb_buf;
590         if (ptlrpc_rep_need_swab(req))
591                 lustre_swab_lu_fid(fid);
592         if (!fid_is_sane(fid)) {
593                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
594                        dt_dev->dd_lu_dev.ld_obd->obd_name,
595                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
596                 GOTO(out, rc = -EINVAL);
597         }
598
599         memcpy(rec, fid, sizeof(*fid));
600
601         GOTO(out, rc = 1);
602
603 out:
604         if (req != NULL)
605                 ptlrpc_req_finished(req);
606
607         dt_update_request_destroy(update);
608
609         return rc;
610 }
611
612 /**
613  * Implementation of dt_index_operations::dio_declare_insert
614  *
615  * Declare the index insert of the remote object, i.e. pack index insert update
616  * into the RPC, which will be sent during transaction start.
617  *
618  * \param[in] env       execution environment
619  * \param[in] dt        object for which to insert index
620  * \param[in] rec       record of the index which will be inserted
621  * \param[in] key       key of the index which will be inserted
622  * \param[in] th        the transaction handle
623  *
624  * \retval              0 if the insertion succeeds.
625  * \retval              negative errno if the insertion fails.
626  */
627 static int osp_md_declare_insert(const struct lu_env *env,
628                                  struct dt_object *dt,
629                                  const struct dt_rec *rec,
630                                  const struct dt_key *key,
631                                  struct thandle *th)
632 {
633         struct dt_update_request *update;
634         int                      rc;
635
636         update = dt_update_request_find_or_create(th, dt);
637         if (IS_ERR(update)) {
638                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
639                        dt->do_lu.lo_dev->ld_obd->obd_name,
640                        (int)PTR_ERR(update));
641                 return PTR_ERR(update);
642         }
643
644         rc = out_index_insert_pack(env, &update->dur_buf,
645                                    lu_object_fid(&dt->do_lu), rec, key,
646                                    update->dur_batchid);
647         return rc;
648 }
649
650 /**
651  * Implementation of dt_index_operations::dio_insert
652  *
653  * Do nothing in this method for now. In DNE phase I, remote updates
654  * are actually executed during transaction start, i.e. the index has
655  * already been inserted when calling this method.
656  *
657  * \param[in] env       execution environment
658  * \param[in] dt        object for which to insert index
659  * \param[in] rec       record of the index to be inserted
660  * \param[in] key       key of the index to be inserted
661  * \param[in] th        the transaction handle
662  * \param[in] capa      capability of insert (not yet implemented)
663  * \param[in] ignore_quota quota enforcement for insert
664  *
665  * \retval              only return 0 for now
666  */
667 static int osp_md_index_insert(const struct lu_env *env,
668                                struct dt_object *dt,
669                                const struct dt_rec *rec,
670                                const struct dt_key *key,
671                                struct thandle *th,
672                                struct lustre_capa *capa,
673                                int ignore_quota)
674 {
675         return 0;
676 }
677
678 /**
679  * Implementation of dt_index_operations::dio_declare_delete
680  *
681  * Declare the index delete of the remote object, i.e. insert index delete
682  * update into the RPC, which will be sent during transaction start.
683  *
684  * \param[in] env       execution environment
685  * \param[in] dt        object for which to delete index
686  * \param[in] key       key of the index
687  * \param[in] th        the transaction handle
688  *
689  * \retval              0 if the insertion succeeds.
690  * \retval              negative errno if the insertion fails.
691  */
692 static int osp_md_declare_delete(const struct lu_env *env,
693                                  struct dt_object *dt,
694                                  const struct dt_key *key,
695                                  struct thandle *th)
696 {
697         struct dt_update_request *update;
698         int                      rc;
699
700         update = dt_update_request_find_or_create(th, dt);
701         if (IS_ERR(update)) {
702                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
703                        dt->do_lu.lo_dev->ld_obd->obd_name,
704                        (int)PTR_ERR(update));
705                 return PTR_ERR(update);
706         }
707
708         rc = out_index_delete_pack(env, &update->dur_buf,
709                                    lu_object_fid(&dt->do_lu), key,
710                                    update->dur_batchid);
711         return rc;
712 }
713
714 /**
715  * Implementation of dt_index_operations::dio_delete
716  *
717  * Do nothing in this method for now. Because in DNE phase I, remote updates
718  * are actually executed during transaction start, i.e. the index has already
719  * been deleted when calling this method.
720  *
721  * \param[in] env       execution environment
722  * \param[in] dt        object for which to delete index
723  * \param[in] key       key of the index which will be deleted
724  * \param[in] th        the transaction handle
725  * \param[in] capa      capability of delete (not yet implemented)
726  *
727  * \retval              only return 0 for now
728  */
729 static int osp_md_index_delete(const struct lu_env *env,
730                                struct dt_object *dt,
731                                const struct dt_key *key,
732                                struct thandle *th,
733                                struct lustre_capa *capa)
734 {
735         CDEBUG(D_INFO, "index delete "DFID" %s\n",
736                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
737
738         return 0;
739 }
740
741 /**
742  * Implementation of dt_index_operations::dio_it.next
743  *
744  * Advance the pointer of the iterator to the next entry. It shares a similar
745  * internal implementation with osp_orphan_it_next(), which is being used for
746  * remote orphan index object. This method will be used for remote directory.
747  *
748  * \param[in] env       execution environment
749  * \param[in] di        iterator of this iteration
750  *
751  * \retval              0 if the pointer is advanced successfuly.
752  * \retval              1 if it reaches to the end of the index object.
753  * \retval              negative errno if the pointer cannot be advanced.
754  */
755 static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
756 {
757         struct osp_it           *it = (struct osp_it *)di;
758         struct lu_idxpage       *idxpage;
759         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
760         int                     rc;
761         ENTRY;
762
763 again:
764         idxpage = it->ooi_cur_idxpage;
765         if (idxpage != NULL) {
766                 if (idxpage->lip_nr == 0)
767                         RETURN(1);
768
769                 it->ooi_pos_ent++;
770                 if (ent == NULL) {
771                         it->ooi_ent =
772                               (struct lu_dirent *)idxpage->lip_entries;
773                         RETURN(0);
774                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
775                            it->ooi_pos_ent < idxpage->lip_nr) {
776                         ent = (struct lu_dirent *)(((char *)ent) +
777                                         le16_to_cpu(ent->lde_reclen));
778                         it->ooi_ent = ent;
779                         RETURN(0);
780                 } else {
781                         it->ooi_ent = NULL;
782                 }
783         }
784
785         rc = osp_it_next_page(env, di);
786         if (rc == 0)
787                 goto again;
788
789         RETURN(rc);
790 }
791
792 /**
793  * Implementation of dt_index_operations::dio_it.key
794  *
795  * Get the key at current iterator poisiton. These iteration methods
796  * (dio_it) will only be used for iterating the remote directory, so
797  * the key is the name of the directory entry.
798  *
799  * \param[in] env       execution environment
800  * \param[in] di        iterator of this iteration
801  *
802  * \retval              name of the current entry
803  */
804 static struct dt_key *osp_it_key(const struct lu_env *env,
805                                  const struct dt_it *di)
806 {
807         struct osp_it           *it = (struct osp_it *)di;
808         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
809
810         return (struct dt_key *)ent->lde_name;
811 }
812
813 /**
814  * Implementation of dt_index_operations::dio_it.key_size
815  *
816  * Get the key size at current iterator poisiton. These iteration methods
817  * (dio_it) will only be used for iterating the remote directory, so the key
818  * size is the name size of the directory entry.
819  *
820  * \param[in] env       execution environment
821  * \param[in] di        iterator of this iteration
822  *
823  * \retval              name size of the current entry
824  */
825
826 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
827 {
828         struct osp_it           *it = (struct osp_it *)di;
829         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
830
831         return (int)le16_to_cpu(ent->lde_namelen);
832 }
833
834 /**
835  * Implementation of dt_index_operations::dio_it.rec
836  *
837  * Get the record at current iterator position. These iteration methods
838  * (dio_it) will only be used for iterating the remote directory, so it
839  * uses lu_dirent_calc_size() to calculate the record size.
840  *
841  * \param[in] env       execution environment
842  * \param[in] di        iterator of this iteration
843  * \param[out] rec      the record to be returned
844  * \param[in] attr      attributes of the index object, so it knows
845  *                      how to pack the entry.
846  *
847  * \retval              only return 0 for now
848  */
849 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
850                                struct dt_rec *rec, __u32 attr)
851 {
852         struct osp_it           *it = (struct osp_it *)di;
853         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
854         size_t                  reclen;
855
856         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
857         memcpy(rec, ent, reclen);
858         return 0;
859 }
860
861 /**
862  * Implementation of dt_index_operations::dio_it.load
863  *
864  * Locate the iteration cursor to the specified position (cookie).
865  *
866  * \param[in] env       pointer to the thread context
867  * \param[in] di        pointer to the iteration structure
868  * \param[in] hash      the specified position
869  *
870  * \retval              positive number for locating to the exactly position
871  *                      or the next
872  * \retval              0 for arriving at the end of the iteration
873  * \retval              negative error number on failure
874  */
875 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
876                        __u64 hash)
877 {
878         struct osp_it   *it     = (struct osp_it *)di;
879         int              rc;
880
881         it->ooi_next = hash;
882         rc = osp_md_index_it_next(env, (struct dt_it *)di);
883         if (rc == 1)
884                 return 0;
885
886         if (rc == 0)
887                 return 1;
888
889         return rc;
890 }
891
892 const struct dt_index_operations osp_md_index_ops = {
893         .dio_lookup         = osp_md_index_lookup,
894         .dio_declare_insert = osp_md_declare_insert,
895         .dio_insert         = osp_md_index_insert,
896         .dio_declare_delete = osp_md_declare_delete,
897         .dio_delete         = osp_md_index_delete,
898         .dio_it     = {
899                 .init     = osp_it_init,
900                 .fini     = osp_it_fini,
901                 .get      = osp_it_get,
902                 .put      = osp_it_put,
903                 .next     = osp_md_index_it_next,
904                 .key      = osp_it_key,
905                 .key_size = osp_it_key_size,
906                 .rec      = osp_md_index_it_rec,
907                 .store    = osp_it_store,
908                 .load     = osp_it_load,
909                 .key_rec  = osp_it_key_rec,
910         }
911 };
912
913 /**
914  * Implementation of dt_object_operations::do_index_try
915  *
916  * Try to initialize the index API pointer for the given object. This
917  * is the entry point of the index API, i.e. we must call this method
918  * to initialize the index object before calling other index methods.
919  *
920  * \param[in] env       execution environment
921  * \param[in] dt        index object to be initialized
922  * \param[in] feat      the index feature of the object
923  *
924  * \retval              0 if the initialization succeeds.
925  * \retval              negative errno if the initialization fails.
926  */
927 static int osp_md_index_try(const struct lu_env *env,
928                             struct dt_object *dt,
929                             const struct dt_index_features *feat)
930 {
931         dt->do_index_ops = &osp_md_index_ops;
932         return 0;
933 }
934
935 /**
936  * Implementation of dt_object_operations::do_object_lock
937  *
938  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
939  * which will lock the object in the global namespace.
940  *
941  * \param[in] env       execution environment
942  * \param[in] dt        object to be locked
943  * \param[out] lh       lock handle
944  * \param[in] einfo     enqueue information
945  * \param[in] policy    lock policy
946  *
947  * \retval              ELDLM_OK if locking the object succeeds.
948  * \retval              negative errno if locking fails.
949  */
950 static int osp_md_object_lock(const struct lu_env *env,
951                               struct dt_object *dt,
952                               struct lustre_handle *lh,
953                               struct ldlm_enqueue_info *einfo,
954                               ldlm_policy_data_t *policy)
955 {
956         struct ldlm_res_id      *res_id;
957         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
958         struct osp_device       *osp = dt2osp_dev(dt_dev);
959         struct ptlrpc_request   *req;
960         int                     rc = 0;
961         __u64                   flags = 0;
962         ldlm_mode_t             mode;
963
964         res_id = einfo->ei_res_id;
965         LASSERT(res_id != NULL);
966
967         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
968                                LDLM_FL_BLOCK_GRANTED, res_id,
969                                einfo->ei_type, policy,
970                                einfo->ei_mode, lh, 0);
971         if (mode > 0)
972                 return ELDLM_OK;
973
974         req = ldlm_enqueue_pack(osp->opd_exp, 0);
975         if (IS_ERR(req))
976                 RETURN(PTR_ERR(req));
977
978         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
979                               (const ldlm_policy_data_t *)policy,
980                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
981
982         ptlrpc_req_finished(req);
983
984         return rc == ELDLM_OK ? 0 : -EIO;
985 }
986
987 /**
988  * Implementation of dt_object_operations::do_object_unlock
989  *
990  * Cancel a lock of a remote object.
991  *
992  * \param[in] env       execution environment
993  * \param[in] dt        object to be unlocked
994  * \param[in] einfo     lock enqueue information
995  * \param[in] policy    lock policy
996  *
997  * \retval              Only return 0 for now.
998  */
999 static int osp_md_object_unlock(const struct lu_env *env,
1000                                 struct dt_object *dt,
1001                                 struct ldlm_enqueue_info *einfo,
1002                                 ldlm_policy_data_t *policy)
1003 {
1004         struct lustre_handle    *lockh = einfo->ei_cbdata;
1005
1006         /* unlock finally */
1007         ldlm_lock_decref(lockh, einfo->ei_mode);
1008
1009         return 0;
1010 }
1011
1012 struct dt_object_operations osp_md_obj_ops = {
1013         .do_read_lock         = osp_md_object_read_lock,
1014         .do_write_lock        = osp_md_object_write_lock,
1015         .do_read_unlock       = osp_md_object_read_unlock,
1016         .do_write_unlock      = osp_md_object_write_unlock,
1017         .do_write_locked      = osp_md_object_write_locked,
1018         .do_declare_create    = osp_md_declare_object_create,
1019         .do_create            = osp_md_object_create,
1020         .do_declare_ref_add   = osp_md_declare_ref_add,
1021         .do_ref_add           = osp_md_object_ref_add,
1022         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1023         .do_ref_del           = osp_md_object_ref_del,
1024         .do_declare_destroy   = osp_declare_object_destroy,
1025         .do_destroy           = osp_object_destroy,
1026         .do_ah_init           = osp_md_ah_init,
1027         .do_attr_get          = osp_attr_get,
1028         .do_declare_attr_set  = osp_md_declare_attr_set,
1029         .do_attr_set          = osp_md_attr_set,
1030         .do_xattr_get         = osp_xattr_get,
1031         .do_declare_xattr_set = osp_declare_xattr_set,
1032         .do_xattr_set         = osp_xattr_set,
1033         .do_declare_xattr_del = osp_declare_xattr_del,
1034         .do_xattr_del         = osp_xattr_del,
1035         .do_index_try         = osp_md_index_try,
1036         .do_object_lock       = osp_md_object_lock,
1037         .do_object_unlock     = osp_md_object_unlock,
1038 };
1039
1040 /**
1041  * Implementation of dt_body_operations::dbo_declare_write
1042  *
1043  * Declare an object write. In DNE phase I, it will pack the write
1044  * object update into the RPC.
1045  *
1046  * \param[in] env       execution environment
1047  * \param[in] dt        object to be written
1048  * \param[in] buf       buffer to write which includes an embedded size field
1049  * \param[in] pos       offet in the object to start writing at
1050  * \param[in] th        transaction handle
1051  *
1052  * \retval              0 if the insertion succeeds.
1053  * \retval              negative errno if the insertion fails.
1054  */
1055 static ssize_t osp_md_declare_write(const struct lu_env *env,
1056                                     struct dt_object *dt,
1057                                     const struct lu_buf *buf,
1058                                     loff_t pos, struct thandle *th)
1059 {
1060         struct dt_update_request  *update;
1061         ssize_t                   rc;
1062
1063         update = dt_update_request_find_or_create(th, dt);
1064         if (IS_ERR(update)) {
1065                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
1066                        dt->do_lu.lo_dev->ld_obd->obd_name,
1067                        (int)PTR_ERR(update));
1068                 return PTR_ERR(update);
1069         }
1070
1071         rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
1072                             buf, pos, update->dur_batchid);
1073
1074         return rc;
1075
1076 }
1077
1078 /**
1079  * Implementation of dt_body_operations::dbo_write
1080  *
1081  * Return the buffer size. In DNE phase I, remote updates
1082  * are actually executed during transaction start, the buffer has
1083  * already been written when this method is being called.
1084  *
1085  * \param[in] env       execution environment
1086  * \param[in] dt        object to be written
1087  * \param[in] buf       buffer to write which includes an embedded size field
1088  * \param[in] pos       offet in the object to start writing at
1089  * \param[in] th        transaction handle
1090  * \param[in] capa      capability of the write (not yet implemented)
1091  * \param[in] ignore_quota quota enforcement for this write
1092  *
1093  * \retval              the buffer size in bytes.
1094  */
1095 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1096                             const struct lu_buf *buf, loff_t *pos,
1097                             struct thandle *handle,
1098                             struct lustre_capa *capa, int ignore_quota)
1099 {
1100         *pos += buf->lb_len;
1101         return buf->lb_len;
1102 }
1103
1104 /* These body operation will be used to write symlinks during migration etc */
1105 struct dt_body_operations osp_md_body_ops = {
1106         .dbo_declare_write      = osp_md_declare_write,
1107         .dbo_write              = osp_md_write,
1108 };