2020from taskiq.compat import model_dump
2121from taskiq.exceptions import SendTaskError
2222from taskiq.message import TaskiqMessage
23+ from taskiq.scheduler.created_schedule import CreatedSchedule
2324from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
2425from taskiq.task import AsyncTaskiqTask
2526from taskiq.utils import maybe_awaitable
@@ -146,13 +147,13 @@ async def kiq(
146147 result_backend=self.broker.result_backend,
147148 )
148149
149- async def schedule_cron (
150+ async def schedule_by_cron (
150151 self,
151152 source: "ScheduleSource",
152153 cron: Union[str, "CronSpec"],
153154 *args: _FuncParams.args,
154155 **kwargs: _FuncParams.kwargs,
155- ) -> None :
156+ ) -> CreatedSchedule[_ReturnType] :
156157 """
157158 Function to schedule task with cron.
158159
@@ -161,34 +162,36 @@ async def schedule_cron(
161162 :param args: function's args.
162163 :param cron_offset: cron offset.
163164 :param kwargs: function's kwargs.
165+
166+ :return: schedule id.
164167 """
168+ schedule_id = self.broker.id_generator()
165169 message = self._prepare_message(*args, **kwargs)
166170 cron_offset = None
167171 if isinstance(cron, CronSpec):
168172 cron_str = cron.to_cron()
169173 cron_offset = cron.offset
170174 else:
171175 cron_str = cron
172- await maybe_awaitable(
173- source.add_schedule(
174- ScheduledTask(
175- task_name=message.task_name,
176- labels=message.labels,
177- args=message.args,
178- kwargs=message.kwargs,
179- cron=cron_str,
180- cron_offset=cron_offset,
181- ),
182- ),
176+ scheduled = ScheduledTask(
177+ schedule_id=schedule_id,
178+ task_name=message.task_name,
179+ labels=message.labels,
180+ args=message.args,
181+ kwargs=message.kwargs,
182+ cron=cron_str,
183+ cron_offset=cron_offset,
183184 )
185+ await source.add_schedule(scheduled)
186+ return CreatedSchedule(self, source, scheduled)
184187
185- async def schedule_time (
188+ async def schedule_by_time (
186189 self,
187190 source: "ScheduleSource",
188191 time: datetime,
189192 *args: _FuncParams.args,
190193 **kwargs: _FuncParams.kwargs,
191- ) -> None :
194+ ) -> CreatedSchedule[_ReturnType] :
192195 """
193196 Function to schedule task to run at specific time.
194197
@@ -197,18 +200,18 @@ async def schedule_time(
197200 :param args: function's args.
198201 :param kwargs: function's kwargs.
199202 """
203+ schedule_id = self.broker.id_generator()
200204 message = self._prepare_message(*args, **kwargs)
201- await maybe_awaitable(
202- source.add_schedule(
203- ScheduledTask(
204- task_name=message.task_name,
205- labels=message.labels,
206- args=message.args,
207- kwargs=message.kwargs,
208- time=time,
209- ),
210- ),
205+ scheduled = ScheduledTask(
206+ schedule_id=schedule_id,
207+ task_name=message.task_name,
208+ labels=message.labels,
209+ args=message.args,
210+ kwargs=message.kwargs,
211+ time=time,
211212 )
213+ await source.add_schedule(scheduled)
214+ return CreatedSchedule(self, source, scheduled)
212215
213216 @classmethod
214217 def _prepare_arg(cls, arg: Any) -> Any:
0 commit comments