Whamcloud - gitweb
LU-3534 update: change sync updates to async updates
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include "osp_internal.h"
70
71 /**
72  * The argument for the interpreter callback of osp request.
73  */
74 struct osp_update_args {
75         struct osp_update_request *oaua_update;
76         atomic_t                 *oaua_count;
77         wait_queue_head_t        *oaua_waitq;
78         bool                      oaua_flow_control;
79 };
80
81 /**
82  * Call back for each update request.
83  */
84 struct osp_update_callback {
85         /* list in the osp_update_request::our_cb_items */
86         struct list_head                 ouc_list;
87
88         /* The target of the async update request. */
89         struct osp_object               *ouc_obj;
90
91         /* The data used by or_interpreter. */
92         void                            *ouc_data;
93
94         /* The interpreter function called after the async request handled. */
95         osp_update_interpreter_t        ouc_interpreter;
96 };
97
98 static struct object_update_request *object_update_request_alloc(size_t size)
99 {
100         struct object_update_request *ourq;
101
102         OBD_ALLOC_LARGE(ourq, size);
103         if (ourq == NULL)
104                 return ERR_PTR(-ENOMEM);
105
106         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
107         ourq->ourq_count = 0;
108
109         return ourq;
110 }
111
112 static void object_update_request_free(struct object_update_request *ourq,
113                                        size_t ourq_size)
114 {
115         if (ourq != NULL)
116                 OBD_FREE_LARGE(ourq, ourq_size);
117 }
118
119 /**
120  * Allocate and initialize osp_update_request
121  *
122  * osp_update_request is being used to track updates being executed on
123  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
124  * and increased if needed.
125  *
126  * \param [in] dt       dt device
127  *
128  * \retval              osp_update_request being allocated if succeed
129  * \retval              ERR_PTR(errno) if failed
130  */
131 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
132 {
133         struct osp_update_request *osp_update_req;
134         struct object_update_request *ourq;
135
136         OBD_ALLOC_PTR(osp_update_req);
137         if (osp_update_req == NULL)
138                 return ERR_PTR(-ENOMEM);
139
140         ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
141         if (IS_ERR(ourq)) {
142                 OBD_FREE_PTR(osp_update_req);
143                 return ERR_CAST(ourq);
144         }
145
146         osp_update_req->our_req = ourq;
147         osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
148
149         INIT_LIST_HEAD(&osp_update_req->our_cb_items);
150         INIT_LIST_HEAD(&osp_update_req->our_list);
151
152         return osp_update_req;
153 }
154
155 void osp_update_request_destroy(struct osp_update_request *our)
156 {
157         if (our == NULL)
158                 return;
159
160         object_update_request_free(our->our_req,
161                                    our->our_req_size);
162         OBD_FREE_PTR(our);
163 }
164
165 static void
166 object_update_request_dump(const struct object_update_request *ourq,
167                            unsigned int mask)
168 {
169         unsigned int i;
170         size_t total_size = 0;
171
172         for (i = 0; i < ourq->ourq_count; i++) {
173                 struct object_update    *update;
174                 size_t                  size = 0;
175
176                 update = object_update_request_get(ourq, i, &size);
177                 LASSERT(update != NULL);
178                 CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
179                        "params = %d batchid = "LPU64" size = %zu\n",
180                        i, PFID(&update->ou_fid),
181                        update_op_str(update->ou_type),
182                        update->ou_master_index, update->ou_params_count,
183                        update->ou_batchid, size);
184
185                 total_size += size;
186         }
187
188         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
189                ourq->ourq_magic, ourq->ourq_count, total_size);
190 }
191
192 static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
193 {
194         struct dt_txn_commit_cb *dcb;
195         struct dt_txn_commit_cb *tmp;
196
197         /* call per-transaction stop callbacks if any */
198         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
199                                  dcb_linkage) {
200                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
201                          "commit callback entry: magic=%x name='%s'\n",
202                          dcb->dcb_magic, dcb->dcb_name);
203                 list_del_init(&dcb->dcb_linkage);
204                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
205         }
206 }
207
208 /**
209  * Allocate an osp request and initialize it with the given parameters.
210  *
211  * \param[in] obj               pointer to the operation target
212  * \param[in] data              pointer to the data used by the interpreter
213  * \param[in] interpreter       pointer to the interpreter function
214  *
215  * \retval                      pointer to the asychronous request
216  * \retval                      NULL if the allocation failed
217  */
218 static struct osp_update_callback *
219 osp_update_callback_init(struct osp_object *obj, void *data,
220                          osp_update_interpreter_t interpreter)
221 {
222         struct osp_update_callback *ouc;
223
224         OBD_ALLOC_PTR(ouc);
225         if (ouc == NULL)
226                 return NULL;
227
228         lu_object_get(osp2lu_obj(obj));
229         INIT_LIST_HEAD(&ouc->ouc_list);
230         ouc->ouc_obj = obj;
231         ouc->ouc_data = data;
232         ouc->ouc_interpreter = interpreter;
233
234         return ouc;
235 }
236
237 /**
238  * Destroy the osp_update_callback.
239  *
240  * \param[in] env       pointer to the thread context
241  * \param[in] ouc       pointer to osp_update_callback
242  */
243 static void osp_update_callback_fini(const struct lu_env *env,
244                                      struct osp_update_callback *ouc)
245 {
246         LASSERT(list_empty(&ouc->ouc_list));
247
248         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
249         OBD_FREE_PTR(ouc);
250 }
251
252 /**
253  * Interpret the packaged OUT RPC results.
254  *
255  * For every packaged sub-request, call its registered interpreter function.
256  * Then destroy the sub-request.
257  *
258  * \param[in] env       pointer to the thread context
259  * \param[in] req       pointer to the RPC
260  * \param[in] arg       pointer to data used by the interpreter
261  * \param[in] rc        the RPC return value
262  *
263  * \retval              0 for success
264  * \retval              negative error number on failure
265  */
266 static int osp_update_interpret(const struct lu_env *env,
267                                 struct ptlrpc_request *req, void *arg, int rc)
268 {
269         struct object_update_reply      *reply  = NULL;
270         struct osp_update_args          *oaua   = arg;
271         struct osp_update_request       *our = oaua->oaua_update;
272         struct osp_thandle              *oth;
273         struct osp_update_callback      *ouc;
274         struct osp_update_callback      *next;
275         int                              count  = 0;
276         int                              index  = 0;
277         int                              rc1    = 0;
278
279         ENTRY;
280
281         if (our == NULL)
282                 RETURN(0);
283
284         oaua->oaua_update = NULL;
285         oth = our->our_th;
286         if (oaua->oaua_flow_control) {
287                 struct osp_device *osp;
288
289                 LASSERT(oth != NULL);
290                 osp = dt2osp_dev(oth->ot_super.th_dev);
291                 obd_put_request_slot(&osp->opd_obd->u.cli);
292         }
293
294         /* Unpack the results from the reply message. */
295         if (req->rq_repmsg != NULL) {
296                 reply = req_capsule_server_sized_get(&req->rq_pill,
297                                                      &RMF_OUT_UPDATE_REPLY,
298                                                      OUT_UPDATE_REPLY_SIZE);
299                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
300                         rc1 = -EPROTO;
301                 else
302                         count = reply->ourp_count;
303         } else {
304                 rc1 = rc;
305         }
306
307         list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
308                 list_del_init(&ouc->ouc_list);
309
310                 /* The peer may only have handled some requests (indicated
311                  * by the 'count') in the packaged OUT RPC, we can only get
312                  * results for the handled part. */
313                 if (index < count && reply->ourp_lens[index] > 0) {
314                         struct object_update_result *result;
315
316                         result = object_update_result_get(reply, index, NULL);
317                         if (result == NULL)
318                                 rc1 = -EPROTO;
319                         else
320                                 rc1 = result->our_rc;
321                 } else {
322                         rc1 = rc;
323                         if (unlikely(rc1 == 0))
324                                 rc1 = -EINVAL;
325                 }
326
327                 if (ouc->ouc_interpreter != NULL)
328                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
329                                              ouc->ouc_data, index, rc1);
330
331                 osp_update_callback_fini(env, ouc);
332                 index++;
333         }
334
335         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
336                 wake_up_all(oaua->oaua_waitq);
337
338         if (oth != NULL) {
339                 /* oth and osp_update_requests will be destoryed in
340                  * osp_thandle_put */
341                 osp_trans_stop_cb(oth, rc);
342                 osp_thandle_put(oth);
343         } else {
344                 osp_update_request_destroy(our);
345         }
346
347         RETURN(0);
348 }
349
350 /**
351  * Pack all the requests in the shared asynchronous idempotent request queue
352  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
353  *
354  * \param[in] env       pointer to the thread context
355  * \param[in] osp       pointer to the OSP device
356  * \param[in] our       pointer to the shared queue
357  *
358  * \retval              0 for success
359  * \retval              negative error number on failure
360  */
361 int osp_unplug_async_request(const struct lu_env *env,
362                              struct osp_device *osp,
363                              struct osp_update_request *our)
364 {
365         struct osp_update_args  *args;
366         struct ptlrpc_request   *req = NULL;
367         int                      rc;
368
369         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
370                                  our->our_req, &req);
371         if (rc != 0) {
372                 struct osp_update_callback *ouc;
373                 struct osp_update_callback *next;
374
375                 list_for_each_entry_safe(ouc, next,
376                                          &our->our_cb_items, ouc_list) {
377                         list_del_init(&ouc->ouc_list);
378                         if (ouc->ouc_interpreter != NULL)
379                                 ouc->ouc_interpreter(env, NULL, NULL,
380                                                      ouc->ouc_obj,
381                                                      ouc->ouc_data, 0, rc);
382                         osp_update_callback_fini(env, ouc);
383                 }
384                 osp_update_request_destroy(our);
385         } else {
386                 args = ptlrpc_req_async_args(req);
387                 args->oaua_update = our;
388                 args->oaua_count = NULL;
389                 args->oaua_waitq = NULL;
390                 args->oaua_flow_control = false;
391                 req->rq_interpret_reply = osp_update_interpret;
392                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
393         }
394
395         return rc;
396 }
397
398 /**
399  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
400  * request queue - osp_device::opd_async_requests.
401  *
402  * If the osp_device::opd_async_requests is not NULL, then return it directly;
403  * otherwise create new osp_update_request and attach it to opd_async_requests.
404  *
405  * \param[in] osp       pointer to the OSP device
406  *
407  * \retval              pointer to the shared queue
408  * \retval              negative error number on failure
409  */
410 static struct osp_update_request *
411 osp_find_or_create_async_update_request(struct osp_device *osp)
412 {
413         struct osp_update_request *our = osp->opd_async_requests;
414
415         if (our != NULL)
416                 return our;
417
418         our = osp_update_request_create(&osp->opd_dt_dev);
419         if (IS_ERR(our))
420                 return our;
421
422         osp->opd_async_requests = our;
423
424         return our;
425 }
426
427 /**
428  * Insert an osp_update_callback into the osp_update_request.
429  *
430  * Insert an osp_update_callback to the osp_update_request. Usually each update
431  * in the osp_update_request will have one correspondent callback, and these
432  * callbacks will be called in rq_interpret_reply.
433  *
434  * \param[in] env               pointer to the thread context
435  * \param[in] obj               pointer to the operation target object
436  * \param[in] data              pointer to the data used by the interpreter
437  * \param[in] interpreter       pointer to the interpreter function
438  *
439  * \retval                      0 for success
440  * \retval                      negative error number on failure
441  */
442 int osp_insert_update_callback(const struct lu_env *env,
443                                struct osp_update_request *our,
444                                struct osp_object *obj, void *data,
445                                osp_update_interpreter_t interpreter)
446 {
447         struct osp_update_callback  *ouc;
448
449         ouc = osp_update_callback_init(obj, data, interpreter);
450         if (ouc == NULL)
451                 RETURN(-ENOMEM);
452
453         list_add_tail(&ouc->ouc_list, &our->our_cb_items);
454
455         return 0;
456 }
457
458 /**
459  * Insert an asynchronous idempotent request to the shared request queue that
460  * is attached to the osp_device.
461  *
462  * This function generates a new osp_async_request with the given parameters,
463  * then tries to insert the request into the osp_device-based shared request
464  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
465  * the shared queue firstly, and then re-tries.
466  *
467  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
468  *       osp_insert_async_request call from others.
469  *
470  * \param[in] env               pointer to the thread context
471  * \param[in] op                operation type, see 'enum update_type'
472  * \param[in] obj               pointer to the operation target
473  * \param[in] count             array size of the subsequent \a lens and \a bufs
474  * \param[in] lens              buffer length array for the subsequent \a bufs
475  * \param[in] bufs              the buffers to compose the request
476  * \param[in] data              pointer to the data used by the interpreter
477  * \param[in] interpreter       pointer to the interpreter function
478  *
479  * \retval                      0 for success
480  * \retval                      negative error number on failure
481  */
482 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
483                              struct osp_object *obj, int count,
484                              __u16 *lens, const void **bufs, void *data,
485                              osp_update_interpreter_t interpreter)
486 {
487         struct osp_device               *osp;
488         struct osp_update_request       *our;
489         struct object_update            *object_update;
490         size_t                          max_update_size;
491         struct object_update_request    *ureq;
492         int                             rc = 0;
493         ENTRY;
494
495         osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
496         our = osp_find_or_create_async_update_request(osp);
497         if (IS_ERR(our))
498                 RETURN(PTR_ERR(our));
499
500 again:
501         ureq = our->our_req;
502         max_update_size = our->our_req_size - object_update_request_size(ureq);
503
504         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
505         rc = out_update_pack(env, object_update, max_update_size, op,
506                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
507         /* The queue is full. */
508         if (rc == -E2BIG) {
509                 osp->opd_async_requests = NULL;
510                 mutex_unlock(&osp->opd_async_requests_mutex);
511
512                 rc = osp_unplug_async_request(env, osp, our);
513                 mutex_lock(&osp->opd_async_requests_mutex);
514                 if (rc != 0)
515                         RETURN(rc);
516
517                 our = osp_find_or_create_async_update_request(osp);
518                 if (IS_ERR(our))
519                         RETURN(PTR_ERR(our));
520
521                 goto again;
522         } else {
523                 if (rc < 0)
524                         RETURN(rc);
525
526                 ureq->ourq_count++;
527         }
528
529         rc = osp_insert_update_callback(env, our, obj, data, interpreter);
530
531         RETURN(rc);
532 }
533
534 int osp_trans_update_request_create(struct thandle *th)
535 {
536         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
537         struct osp_update_request       *our;
538
539         if (oth->ot_our != NULL)
540                 return 0;
541
542         our = osp_update_request_create(th->th_dev);
543         if (IS_ERR(our)) {
544                 th->th_result = PTR_ERR(our);
545                 return PTR_ERR(our);
546         }
547
548         oth->ot_our = our;
549         our->our_th = oth;
550
551         if (oth->ot_super.th_sync)
552                 oth->ot_our->our_flags |= UPDATE_FL_SYNC;
553
554         return 0;
555 }
556
557 void osp_thandle_destroy(struct osp_thandle *oth)
558 {
559         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
560         LASSERT(list_empty(&oth->ot_commit_dcb_list));
561         LASSERT(list_empty(&oth->ot_stop_dcb_list));
562         if (oth->ot_our != NULL)
563                 osp_update_request_destroy(oth->ot_our);
564         OBD_FREE_PTR(oth);
565 }
566
567 /**
568  * The OSP layer dt_device_operations::dt_trans_create() interface
569  * to create a transaction.
570  *
571  * There are two kinds of transactions that will involve OSP:
572  *
573  * 1) If the transaction only contains the updates on remote server
574  *    (MDT or OST), such as re-generating the lost OST-object for
575  *    LFSCK, then it is a remote transaction. For remote transaction,
576  *    the upper layer caller (such as the LFSCK engine) will call the
577  *    dt_trans_create() (with the OSP dt_device as the parameter),
578  *    then the call will be directed to the osp_trans_create() that
579  *    creates the transaction handler and returns it to the caller.
580  *
581  * 2) If the transcation contains both local and remote updates,
582  *    such as cross MDTs create under DNE mode, then the upper layer
583  *    caller will not trigger osp_trans_create(). Instead, it will
584  *    call dt_trans_create() on other dt_device, such as LOD that
585  *    will generate the transaction handler. Such handler will be
586  *    used by the whole transaction in subsequent sub-operations.
587  *
588  * \param[in] env       pointer to the thread context
589  * \param[in] d         pointer to the OSP dt_device
590  *
591  * \retval              pointer to the transaction handler
592  * \retval              negative error number on failure
593  */
594 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
595 {
596         struct osp_thandle              *oth;
597         struct thandle                  *th = NULL;
598         ENTRY;
599
600         OBD_ALLOC_PTR(oth);
601         if (unlikely(oth == NULL))
602                 RETURN(ERR_PTR(-ENOMEM));
603
604         oth->ot_magic = OSP_THANDLE_MAGIC;
605         th = &oth->ot_super;
606         th->th_dev = d;
607         th->th_tags = LCT_TX_HANDLE;
608
609         atomic_set(&oth->ot_refcount, 1);
610         INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
611         INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
612
613         RETURN(th);
614 }
615
616 /**
617  * Prepare update request.
618  *
619  * Prepare OUT update ptlrpc request, and the request usually includes
620  * all of updates (stored in \param ureq) from one operation.
621  *
622  * \param[in] env       execution environment
623  * \param[in] imp       import on which ptlrpc request will be sent
624  * \param[in] ureq      hold all of updates which will be packed into the req
625  * \param[in] reqp      request to be created
626  *
627  * \retval              0 if preparation succeeds.
628  * \retval              negative errno if preparation fails.
629  */
630 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
631                         const struct object_update_request *ureq,
632                         struct ptlrpc_request **reqp)
633 {
634         struct ptlrpc_request           *req;
635         struct object_update_request    *tmp;
636         size_t                          ureq_len;
637         int                             rc;
638         ENTRY;
639
640         object_update_request_dump(ureq, D_INFO);
641         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
642         if (req == NULL)
643                 RETURN(-ENOMEM);
644
645         ureq_len = object_update_request_size(ureq);
646         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
647                              ureq_len);
648
649         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
650         if (rc != 0) {
651                 ptlrpc_req_finished(req);
652                 RETURN(rc);
653         }
654
655         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
656                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
657
658         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
659         memcpy(tmp, ureq, ureq_len);
660
661         ptlrpc_request_set_replen(req);
662         req->rq_request_portal = OUT_PORTAL;
663         req->rq_reply_portal = OSC_REPLY_PORTAL;
664         *reqp = req;
665
666         RETURN(rc);
667 }
668
669 /**
670  * Send update RPC.
671  *
672  * Send update request to the remote MDT synchronously.
673  *
674  * \param[in] env       execution environment
675  * \param[in] imp       import on which ptlrpc request will be sent
676  * \param[in] our       hold all of updates which will be packed into the req
677  * \param[in] reqp      request to be created
678  *
679  * \retval              0 if RPC succeeds.
680  * \retval              negative errno if RPC fails.
681  */
682
683 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
684                     struct osp_update_request *our,
685                     struct ptlrpc_request **reqp)
686 {
687         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
688         struct ptlrpc_request   *req = NULL;
689         int                     rc;
690         ENTRY;
691
692         rc = osp_prep_update_req(env, imp, our->our_req, &req);
693         if (rc != 0)
694                 RETURN(rc);
695
696         /* This will only be called with read-only update, and these updates
697          * might be used to retrieve update log during recovery process, so
698          * it will be allowed to send during recovery process */
699         req->rq_allow_replay = 1;
700
701         /* Note: some dt index api might return non-zero result here, like
702          * osd_index_ea_lookup, so we should only check rc < 0 here */
703         rc = ptlrpc_queue_wait(req);
704         if (rc < 0) {
705                 ptlrpc_req_finished(req);
706                 our->our_rc = rc;
707                 RETURN(rc);
708         }
709
710         if (reqp != NULL) {
711                 *reqp = req;
712                 RETURN(rc);
713         }
714
715         our->our_rc = rc;
716
717         ptlrpc_req_finished(req);
718
719         RETURN(rc);
720 }
721
722 /**
723  * Add commit callback to transaction.
724  *
725  * Add commit callback to the osp thandle, which will be called
726  * when the thandle is committed remotely.
727  *
728  * \param[in] th        the thandle
729  * \param[in] dcb       commit callback structure
730  *
731  * \retval              only return 0 for now.
732  */
733 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
734 {
735         struct osp_thandle *oth = thandle_to_osp_thandle(th);
736
737         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
738         LASSERT(&dcb->dcb_func != NULL);
739         if (dcb->dcb_flags & DCB_TRANS_STOP)
740                 list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
741         else
742                 list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
743         return 0;
744 }
745
746 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
747 {
748         struct dt_txn_commit_cb *dcb;
749         struct dt_txn_commit_cb *tmp;
750
751         LASSERT(atomic_read(&oth->ot_refcount) > 0);
752         /* call per-transaction callbacks if any */
753         list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
754                                  dcb_linkage) {
755                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
756                          "commit callback entry: magic=%x name='%s'\n",
757                          dcb->dcb_magic, dcb->dcb_name);
758                 list_del_init(&dcb->dcb_linkage);
759                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
760         }
761 }
762
763 static void osp_request_commit_cb(struct ptlrpc_request *req)
764 {
765         struct thandle          *th = req->rq_cb_data;
766         struct osp_thandle      *oth;
767         __u64                   last_committed_transno = 0;
768         int                     result = req->rq_status;
769         ENTRY;
770
771         if (th == NULL)
772                 RETURN_EXIT;
773
774         oth = thandle_to_osp_thandle(th);
775         if (lustre_msg_get_last_committed(req->rq_repmsg))
776                 last_committed_transno =
777                         lustre_msg_get_last_committed(req->rq_repmsg);
778
779         if (last_committed_transno <
780                 req->rq_import->imp_peer_committed_transno)
781                 last_committed_transno =
782                         req->rq_import->imp_peer_committed_transno;
783
784         CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
785                req->rq_transno, last_committed_transno);
786
787         /* If the transaction is not really committed, mark result = 1 */
788         if (req->rq_transno != 0 &&
789             (req->rq_transno > last_committed_transno) && result == 0)
790                 result = 1;
791
792         osp_trans_commit_cb(oth, result);
793         req->rq_committed = 1;
794         osp_thandle_put(oth);
795         EXIT;
796 }
797
798 /**
799  * callback of osp transaction
800  *
801  * Call all of callbacks for this osp thandle. This will only be
802  * called in error handler path. In the normal processing path,
803  * these callback will be called in osp_request_commit_cb() and
804  * osp_update_interpret().
805  *
806  * \param [in] env      execution environment
807  * \param [in] oth      osp thandle
808  * \param [in] rc       result of the osp thandle
809  */
810 void osp_trans_callback(const struct lu_env *env,
811                         struct osp_thandle *oth, int rc)
812 {
813         struct osp_update_callback *ouc;
814         struct osp_update_callback *next;
815
816         if (oth->ot_our != NULL) {
817                 list_for_each_entry_safe(ouc, next,
818                                          &oth->ot_our->our_cb_items, ouc_list) {
819                         list_del_init(&ouc->ouc_list);
820                         if (ouc->ouc_interpreter != NULL)
821                                 ouc->ouc_interpreter(env, NULL, NULL,
822                                                      ouc->ouc_obj,
823                                                      ouc->ouc_data, 0, rc);
824                         osp_update_callback_fini(env, ouc);
825                 }
826         }
827         osp_trans_stop_cb(oth, rc);
828         osp_trans_commit_cb(oth, rc);
829 }
830
831 /**
832  * Send the request for remote updates.
833  *
834  * Send updates to the remote MDT. Prepare the request by osp_update_req
835  * and send them to remote MDT, for sync request, it will wait
836  * until the reply return, otherwise hand it to ptlrpcd.
837  *
838  * Please refer to osp_trans_create() for transaction type.
839  *
840  * \param[in] env               pointer to the thread context
841  * \param[in] osp               pointer to the OSP device
842  * \param[in] our               pointer to the osp_update_request
843  *
844  * \retval                      0 for success
845  * \retval                      negative error number on failure
846  */
847 static int osp_send_update_req(const struct lu_env *env,
848                                struct osp_device *osp,
849                                struct osp_update_request *our)
850 {
851         struct osp_update_args  *args;
852         struct ptlrpc_request   *req;
853         struct lu_device *top_device;
854         struct osp_thandle      *oth = our->our_th;
855         int     rc = 0;
856         ENTRY;
857
858         LASSERT(oth != NULL);
859         LASSERT(our->our_req_sent == 0);
860         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
861                                  our->our_req, &req);
862         if (rc != 0) {
863                 osp_trans_callback(env, oth, rc);
864                 RETURN(rc);
865         }
866
867         args = ptlrpc_req_async_args(req);
868         args->oaua_update = our;
869         osp_thandle_get(oth); /* hold for update interpret */
870         req->rq_interpret_reply = osp_update_interpret;
871         if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
872                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
873                         osp_trans_callback(env, oth, rc);
874                         osp_thandle_put(oth);
875                         GOTO(out, rc = -ENOTCONN);
876                 }
877
878                 rc = obd_get_request_slot(&osp->opd_obd->u.cli);
879                 if (rc != 0) {
880                         osp_trans_callback(env, oth, rc);
881                         osp_thandle_put(oth);
882                         GOTO(out, rc = -ENOTCONN);
883                 }
884                 args->oaua_flow_control = true;
885
886                 if (!osp->opd_connect_mdt) {
887                         down_read(&osp->opd_async_updates_rwsem);
888                         args->oaua_count = &osp->opd_async_updates_count;
889                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
890                         up_read(&osp->opd_async_updates_rwsem);
891                         atomic_inc(args->oaua_count);
892                 }
893
894                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
895                 req = NULL;
896         } else {
897                 osp_thandle_get(oth); /* hold for commit callback */
898                 req->rq_commit_cb = osp_request_commit_cb;
899                 req->rq_cb_data = &oth->ot_super;
900                 args->oaua_flow_control = false;
901
902                 /* If the transaction is created during MDT recoverying
903                  * process, it means this is an recovery update, we need
904                  * to let OSP send it anyway without checking recoverying
905                  * status, in case the other target is being recoveried
906                  * at the same time, and if we wait here for the import
907                  * to be recoveryed, it might cause deadlock */
908                 top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
909                 if (top_device->ld_obd->obd_recovering)
910                         req->rq_allow_replay = 1;
911
912                 osp_get_rpc_lock(osp);
913                 rc = ptlrpc_queue_wait(req);
914                 osp_put_rpc_lock(osp);
915                 if ((rc == -ENOMEM && req->rq_set == NULL) ||
916                     (req->rq_transno == 0 && !req->rq_committed)) {
917                         if (args->oaua_update != NULL) {
918                                 /* If osp_update_interpret is not being called,
919                                  * release the osp_thandle */
920                                 args->oaua_update = NULL;
921                                 osp_thandle_put(oth);
922                         }
923
924                         req->rq_cb_data = NULL;
925                         rc = rc == 0 ? req->rq_status : rc;
926                         osp_trans_callback(env, oth, rc);
927                         osp_thandle_put(oth);
928                         GOTO(out, rc);
929                 }
930         }
931 out:
932         if (req != NULL)
933                 ptlrpc_req_finished(req);
934
935         RETURN(rc);
936 }
937
938 /**
939  * Get local thandle for osp_thandle
940  *
941  * Get the local OSD thandle from the OSP thandle. Currently, there
942  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
943  * to update the object on local OSD device.
944  *
945  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
946  * we will get local thandle by thandle_get_sub_by_dt.
947  *
948  * If the osp_thandle is remote thandle (th_top == NULL, only used
949  * by LFSCK), then it will create a local thandle, and stop it in
950  * osp_trans_stop(). And this only happens on OSP for OST.
951  *
952  * These are temporary solution, once OSP accessing OSD object is
953  * being fixed properly, this function should be removed. XXX
954  *
955  * \param[in] env               pointer to the thread context
956  * \param[in] th                pointer to the transaction handler
957  * \param[in] dt                pointer to the OSP device
958  *
959  * \retval                      pointer to the local thandle
960  * \retval                      ERR_PTR(errno) if it fails.
961  **/
962 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
963                                         struct thandle *th,
964                                         struct osp_device *osp)
965 {
966         struct osp_thandle      *oth;
967         struct thandle          *local_th;
968
969         if (th->th_top != NULL)
970                 return thandle_get_sub_by_dt(env, th->th_top,
971                                              osp->opd_storage);
972
973         LASSERT(!osp->opd_connect_mdt);
974         oth = thandle_to_osp_thandle(th);
975         if (oth->ot_storage_th != NULL)
976                 return oth->ot_storage_th;
977
978         local_th = dt_trans_create(env, osp->opd_storage);
979         if (IS_ERR(local_th))
980                 return local_th;
981
982         oth->ot_storage_th = local_th;
983
984         return local_th;
985 }
986
987 /**
988  * Set version for the transaction
989  *
990  * Set the version for the transaction, then the osp RPC will be
991  * sent in the order of version, i.e. the transaction with lower
992  * version will be sent first.
993  *
994  * \param [in] oth      osp thandle to be set version.
995  *
996  * \retval              0 if set version succeeds
997  *                      negative errno if set version fails.
998  */
999 int osp_check_and_set_rpc_version(struct osp_thandle *oth)
1000 {
1001         struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
1002         struct osp_updates *ou = osp->opd_update;
1003
1004         if (ou == NULL)
1005                 return -EIO;
1006
1007         if (oth->ot_version != 0)
1008                 return 0;
1009
1010         spin_lock(&ou->ou_lock);
1011         oth->ot_version = ou->ou_version++;
1012         spin_unlock(&ou->ou_lock);
1013
1014         CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
1015                osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
1016
1017         return 0;
1018 }
1019
1020 /**
1021  * Get next OSP update request in the sending list
1022  * Get next OSP update request in the sending list by version number, next
1023  * request will be
1024  * 1. transaction which does not have a version number.
1025  * 2. transaction whose version == opd_rpc_version.
1026  *
1027  * \param [in] ou       osp update structure.
1028  * \param [out] ourp    the pointer holding the next update request.
1029  *
1030  * \retval              true if getting the next transaction.
1031  * \retval              false if not getting the next transaction.
1032  */
1033 static bool
1034 osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
1035 {
1036         struct osp_update_request *our;
1037         struct osp_update_request *tmp;
1038         bool                    got_req = false;
1039
1040         spin_lock(&ou->ou_lock);
1041         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1042                 LASSERT(our->our_th != NULL);
1043                 CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
1044                        our, our->our_th->ot_version, ou->ou_rpc_version);
1045                 if (our->our_th->ot_version == 0) {
1046                         list_del_init(&our->our_list);
1047                         *ourp = our;
1048                         got_req = true;
1049                         break;
1050                 }
1051
1052                 /* Find next osp_update_request in the list */
1053                 if (our->our_th->ot_version == ou->ou_rpc_version) {
1054                         list_del_init(&our->our_list);
1055                         *ourp = our;
1056                         got_req = true;
1057                         break;
1058                 }
1059         }
1060         spin_unlock(&ou->ou_lock);
1061
1062         return got_req;
1063 }
1064
1065 static void osp_update_rpc_version(struct osp_updates *ou,
1066                                    struct osp_thandle *oth)
1067 {
1068         if (oth->ot_version == 0)
1069                 return;
1070
1071         LASSERT(oth->ot_version == ou->ou_rpc_version);
1072         spin_lock(&ou->ou_lock);
1073         ou->ou_rpc_version++;
1074         spin_unlock(&ou->ou_lock);
1075 }
1076
1077 /**
1078  * Sending update thread
1079  *
1080  * Create thread to send update request to other MDTs, this thread will pull
1081  * out update request from the list in OSP by version number, i.e. it will
1082  * make sure the update request with lower version number will be sent first.
1083  *
1084  * \param[in] arg       hold the OSP device.
1085  *
1086  * \retval              0 if the thread is created successfully.
1087  * \retal               negative error if the thread is not created
1088  *                      successfully.
1089  */
1090 int osp_send_update_thread(void *arg)
1091 {
1092         struct lu_env           env;
1093         struct osp_device       *osp = arg;
1094         struct l_wait_info       lwi = { 0 };
1095         struct osp_updates      *ou = osp->opd_update;
1096         struct ptlrpc_thread    *thread = &osp->opd_update_thread;
1097         struct osp_update_request *our = NULL;
1098         int                     rc;
1099         ENTRY;
1100
1101         LASSERT(ou != NULL);
1102         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1103         if (rc < 0) {
1104                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1105                        rc);
1106                 RETURN(rc);
1107         }
1108
1109         thread->t_flags = SVC_RUNNING;
1110         wake_up(&thread->t_ctl_waitq);
1111         while (1) {
1112                 our = NULL;
1113                 l_wait_event(ou->ou_waitq,
1114                              !osp_send_update_thread_running(osp) ||
1115                              osp_get_next_request(ou, &our),
1116                              &lwi);
1117
1118                 if (!osp_send_update_thread_running(osp)) {
1119                         if (our != NULL && our->our_th != NULL) {
1120                                 osp_trans_callback(&env, our->our_th, -EINTR);
1121                                 osp_thandle_put(our->our_th);
1122                         }
1123                         break;
1124                 }
1125
1126                 if (our->our_req_sent == 0) {
1127                         if (our->our_th != NULL &&
1128                             our->our_th->ot_super.th_result != 0)
1129                                 osp_trans_callback(&env, our->our_th,
1130                                         our->our_th->ot_super.th_result);
1131                         else
1132                                 rc = osp_send_update_req(&env, osp, our);
1133                 }
1134
1135                 if (our->our_th != NULL) {
1136                         /* Update the rpc version */
1137                         osp_update_rpc_version(ou, our->our_th);
1138                         /* Balanced for thandle_get in osp_trans_trigger() */
1139                         osp_thandle_put(our->our_th);
1140                 }
1141         }
1142
1143         thread->t_flags = SVC_STOPPED;
1144         lu_env_fini(&env);
1145         wake_up(&thread->t_ctl_waitq);
1146
1147         RETURN(0);
1148 }
1149
1150 /**
1151  * Trigger the request for remote updates.
1152  *
1153  * Add the request to the sending list, and wake up osp update
1154  * sending thread.
1155  *
1156  * \param[in] env               pointer to the thread context
1157  * \param[in] osp               pointer to the OSP device
1158  * \param[in] oth               pointer to the transaction handler
1159  *
1160  */
1161 static void osp_trans_trigger(const struct lu_env *env,
1162                              struct osp_device *osp,
1163                              struct osp_thandle *oth)
1164 {
1165
1166         CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
1167                osp->opd_obd->obd_name, oth, oth->ot_version);
1168
1169         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
1170         osp_thandle_get(oth);
1171         LASSERT(oth->ot_our != NULL);
1172         spin_lock(&osp->opd_update->ou_lock);
1173         list_add_tail(&oth->ot_our->our_list,
1174                       &osp->opd_update->ou_list);
1175         spin_unlock(&osp->opd_update->ou_lock);
1176
1177         wake_up(&osp->opd_update->ou_waitq);
1178 }
1179
1180 /**
1181  * The OSP layer dt_device_operations::dt_trans_start() interface
1182  * to start the transaction.
1183  *
1184  * If the transaction is a remote transaction, then related remote
1185  * updates will be triggered in the osp_trans_stop().
1186  * Please refer to osp_trans_create() for transaction type.
1187  *
1188  * \param[in] env               pointer to the thread context
1189  * \param[in] dt                pointer to the OSP dt_device
1190  * \param[in] th                pointer to the transaction handler
1191  *
1192  * \retval                      0 for success
1193  * \retval                      negative error number on failure
1194  */
1195 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
1196                     struct thandle *th)
1197 {
1198         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
1199
1200         /* For remote thandle, if there are local thandle, start it here*/
1201         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
1202                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
1203                                       oth->ot_storage_th);
1204         return 0;
1205 }
1206
1207 /**
1208  * The OSP layer dt_device_operations::dt_trans_stop() interface
1209  * to stop the transaction.
1210  *
1211  * If the transaction is a remote transaction, related remote
1212  * updates will be triggered here via osp_trans_trigger().
1213  *
1214  * For synchronous mode update or any failed update, the request
1215  * will be destroyed explicitly when the osp_trans_stop().
1216  *
1217  * Please refer to osp_trans_create() for transaction type.
1218  *
1219  * \param[in] env               pointer to the thread context
1220  * \param[in] dt                pointer to the OSP dt_device
1221  * \param[in] th                pointer to the transaction handler
1222  *
1223  * \retval                      0 for success
1224  * \retval                      negative error number on failure
1225  */
1226 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
1227                    struct thandle *th)
1228 {
1229         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
1230         struct osp_update_request *our = oth->ot_our;
1231         struct osp_device        *osp = dt2osp_dev(dt);
1232         int                      rc = 0;
1233         ENTRY;
1234
1235         /* For remote transaction, if there is local storage thandle,
1236          * stop it first */
1237         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
1238                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
1239                               oth->ot_storage_th);
1240                 oth->ot_storage_th = NULL;
1241         }
1242
1243         if (our == NULL || our->our_req == NULL ||
1244             our->our_req->ourq_count == 0) {
1245                 osp_trans_callback(env, oth, th->th_result);
1246                 GOTO(out, rc = th->th_result);
1247         }
1248
1249         if (!osp->opd_connect_mdt) {
1250                 rc = osp_send_update_req(env, osp, oth->ot_our);
1251                 GOTO(out, rc);
1252         }
1253
1254         if (osp->opd_update == NULL ||
1255             !osp_send_update_thread_running(osp)) {
1256                 osp_trans_callback(env, oth, -EIO);
1257                 GOTO(out, rc = -EIO);
1258         }
1259
1260         if (th->th_sync) {
1261                 /* if th_sync is set, then it needs to be sent
1262                  * right away. Note: even thought the RPC has been
1263                  * sent, it still needs to be added to the sending
1264                  * list (see osp_trans_trigger()), so ou_rpc_version
1265                  * can be updated correctly. */
1266                 rc = osp_send_update_req(env, osp, our);
1267                 our->our_req_sent = 1;
1268         }
1269
1270         osp_trans_trigger(env, osp, oth);
1271 out:
1272         osp_thandle_put(oth);
1273
1274         RETURN(rc);
1275 }