4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
26 * lustre/osp/osp_trans.c
28 * Author: Di Wang <di.wang@intel.com>
29 * Author: Fan, Yong <fan.yong@intel.com>
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "osp_internal.h"
36 struct osp_async_update_args {
37 struct update_request *oaua_update;
38 unsigned int oaua_fc:1;
41 struct osp_async_update_item {
42 struct list_head oaui_list;
43 struct osp_object *oaui_obj;
45 osp_async_update_interpterer_t oaui_interpterer;
48 static struct osp_async_update_item *
49 osp_async_update_item_init(struct osp_object *obj, void *data,
50 osp_async_update_interpterer_t interpterer)
52 struct osp_async_update_item *oaui;
58 lu_object_get(osp2lu_obj(obj));
59 INIT_LIST_HEAD(&oaui->oaui_list);
61 oaui->oaui_data = data;
62 oaui->oaui_interpterer = interpterer;
67 static void osp_async_update_item_fini(const struct lu_env *env,
68 struct osp_async_update_item *oaui)
70 LASSERT(list_empty(&oaui->oaui_list));
72 lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
76 static int osp_async_update_interpret(const struct lu_env *env,
77 struct ptlrpc_request *req,
80 struct update_reply *reply = NULL;
81 struct osp_async_update_args *oaua = arg;
82 struct update_request *update = oaua->oaua_update;
83 struct osp_async_update_item *oaui;
84 struct osp_async_update_item *next;
85 struct osp_device *osp = dt2osp_dev(update->ur_dt);
91 up(&osp->opd_async_fc_sem);
93 if (rc == 0 || req->rq_repmsg != NULL) {
94 reply = req_capsule_server_sized_get(&req->rq_pill,
97 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
100 count = reply->ur_count;
105 list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
106 list_del_init(&oaui->oaui_list);
107 if (index < count && reply->ur_lens[index] > 0) {
108 char *ptr = update_get_buf_internal(reply, index, NULL);
110 LASSERT(ptr != NULL);
112 rc1 = le32_to_cpu(*(int *)ptr);
115 if (unlikely(rc1 == 0))
119 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
120 oaui->oaui_data, index, rc1);
121 osp_async_update_item_fini(env, oaui);
125 out_destroy_update_req(update);
130 int osp_unplug_async_update(const struct lu_env *env,
131 struct osp_device *osp,
132 struct update_request *update)
134 struct osp_async_update_args *args;
135 struct ptlrpc_request *req = NULL;
138 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
139 update->ur_buf, UPDATE_BUFFER_SIZE, &req);
141 struct osp_async_update_item *oaui;
142 struct osp_async_update_item *next;
144 list_for_each_entry_safe(oaui, next,
145 &update->ur_cb_items, oaui_list) {
146 list_del_init(&oaui->oaui_list);
147 oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
148 oaui->oaui_data, 0, rc);
149 osp_async_update_item_fini(env, oaui);
151 out_destroy_update_req(update);
153 LASSERT(list_empty(&update->ur_list));
155 args = ptlrpc_req_async_args(req);
156 args->oaua_update = update;
157 req->rq_interpret_reply = osp_async_update_interpret;
158 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
164 /* with osp::opd_async_requests_mutex held */
165 struct update_request *
166 osp_find_or_create_async_update_request(struct osp_device *osp)
168 struct update_request *update = osp->opd_async_requests;
173 update = out_create_update_req(&osp->opd_dt_dev);
175 osp->opd_async_requests = update;
180 /* with osp::opd_async_requests_mutex held */
181 int osp_insert_async_update(const struct lu_env *env,
182 struct update_request *update, int op,
183 struct osp_object *obj, int count,
184 int *lens, const char **bufs, void *data,
185 osp_async_update_interpterer_t interpterer)
187 struct osp_async_update_item *oaui;
188 struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
192 oaui = osp_async_update_item_init(obj, data, interpterer);
197 rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
200 osp->opd_async_requests = NULL;
201 mutex_unlock(&osp->opd_async_requests_mutex);
203 rc = osp_unplug_async_update(env, osp, update);
204 mutex_lock(&osp->opd_async_requests_mutex);
208 update = osp_find_or_create_async_update_request(osp);
210 GOTO(out, rc = PTR_ERR(update));
216 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
222 osp_async_update_item_fini(env, oaui);
228 * If the transaction creation goes to OSP, it means the update
229 * in this transaction only includes remote UPDATE. It is only
230 * used by LFSCK right now.
232 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
234 struct thandle *th = NULL;
235 struct thandle_update *tu = NULL;
239 if (unlikely(th == NULL))
240 GOTO(out, rc = -ENOMEM);
243 th->th_tags = LCT_TX_HANDLE;
244 atomic_set(&th->th_refc, 1);
245 th->th_alloc_size = sizeof(*th);
249 GOTO(out, rc = -ENOMEM);
251 INIT_LIST_HEAD(&tu->tu_remote_update_list);
252 tu->tu_only_remote_trans = 1;
267 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
268 struct update_request *update, struct thandle *th,
271 struct thandle_update *tu = th->th_update;
276 /* If the transaction only includes remote update, it should
277 * still be asynchronous */
278 if (is_only_remote_trans(th)) {
279 struct osp_async_update_args *args;
280 struct ptlrpc_request *req;
282 list_del_init(&update->ur_list);
283 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
285 UPDATE_BUFFER_SIZE, &req);
287 args = ptlrpc_req_async_args(req);
288 args->oaua_update = update;
289 args->oaua_fc = !!fc;
290 req->rq_interpret_reply =
291 osp_async_update_interpret;
292 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
294 out_destroy_update_req(update);
297 /* Before we support async update, the cross MDT transaction
298 * has to been synchronized */
300 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
307 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
310 struct thandle_update *tu = th->th_update;
311 struct update_request *update;
317 /* Check whether there are updates related with this OSP */
318 update = out_find_update(tu, dt);
322 /* Note: some updates needs to send before local transaction,
323 * some needs to send after local transaction.
325 * If the transaction only includes remote updates, it will
326 * send updates to remote MDT in osp_trans_stop.
328 * If it is remote create, it will send the remote req after
329 * local transaction. i.e. create the object locally first,
330 * then insert the name entry.
332 * If it is remote unlink, it will send the remote req before
333 * the local transaction, i.e. delete the name entry remote
334 * first, then destroy the local object. */
335 if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
336 rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th, false);
341 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
344 struct thandle_update *tu = th->th_update;
345 struct update_request *update;
349 /* Check whether there are updates related with this OSP */
350 update = out_find_update(tu, dt);
351 if (update == NULL) {
352 if (!is_only_remote_trans(th))
357 if (update->ur_buf->ub_count == 0) {
358 out_destroy_update_req(update);
362 if (is_only_remote_trans(th)) {
363 if (th->th_result == 0) {
364 struct osp_device *osp = dt2osp_dev(th->th_dev);
367 if (!osp->opd_imp_active ||
368 osp->opd_got_disconnected) {
369 out_destroy_update_req(update);
370 GOTO(put, rc = -ENOTCONN);
373 /* Get the semaphore to guarantee it has
374 * free slot, which will be released via
375 * osp_async_update_interpret(). */
376 rc = down_timeout(&osp->opd_async_fc_sem, HZ);
379 rc = osp_trans_trigger(env, dt2osp_dev(dt),
382 up(&osp->opd_async_fc_sem);
385 out_destroy_update_req(update);
388 if (tu->tu_sent_after_local_trans)
389 rc = osp_trans_trigger(env, dt2osp_dev(dt),
392 out_destroy_update_req(update);