o
    (yi@                     @   sF  d dl mZ d dlZd dlZd dlZd dlZd dlmZmZmZ d dl	m
Z
 d dlZd dlmZ ddlmZmZmZmZmZmZmZmZmZ ddlmZmZmZmZ dd	lmZ dd
l m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ee'Z(e
  dZ)dZ*ej+,ej+-e.dZ/dZ0dZ1dZ2dZ3dd Z4dd Z5dd Z6ej7dddddZ8dS )     )	getLoggerN)datetimetimezone	timedelta)load_dotenv
celery_app   )	sanitize_col_namesave_watermarkDB_PATHget_system_stateset_system_staterecreate_deals_table create_deals_table_if_not_exists recreate_user_restrictions_tableget_deal_count)fetch_deals_for_dealsfetch_product_batchvalidate_asinfetch_seller_data)TokenManager)_process_single_dealclean_numeric_values)get_seller_info_for_single_deal)clear_analysis_cachez2.9-persistent-statedealszheaders.json   backfill_deals_locki / zbackfill_state.jsonc                 C   s(   t dddtjd}|t| d }| S )zFConverts Keepa time (minutes since 2011-01-01) to ISO 8601 UTC string.i  r	   )tzinfo)minutes)r   r   utcr   	isoformat)keepa_minuteskeepa_epoch	dt_object r&   1/var/www/agentarbitrage/keepa_deals/backfiller.py_convert_keepa_time_to_iso*   s   r(   c               
   C   s  t d} | dur!zt| W S  ty    td|  d Y dS w tjtrt	d z3t
td#}t|}|dd}t	d	| d
 t| |W  d   W S 1 sXw   Y  W dS  tjttfy~ } ztd|  W Y d}~dS d}~ww dS )z
    Loads the last completed page from the system_state table.
    Migrates from legacy JSON file if DB entry is missing.
    backfill_pageNzInvalid backfill_page in DB: z. Defaulting to 0.r   z5Backfill state missing in DB. Checking legacy file...rlast_completed_pagez"Found legacy backfill state: page z. Migrating to DB.z%Error loading legacy backfill state: )r   int
ValueErrorloggererrorospathexistsSTATE_FILE_LEGACYinfoopenjsonloadgetsave_backfill_stateJSONDecodeErrorFileNotFoundError)valfdatapageer&   r&   r'   load_backfill_state0   s2   


$
rA   c                 C   s$   t dt|  td|  d dS )z8Saves the last completed page to the system_state table.r)   z/--- Backfill state saved. Last completed page:  ---N)r   strr.   r4   )page_numberr&   r&   r'   r9   N   s   r9   z%keepa_deals.backfiller.backfill_deals)nameFc           .         s  t   | rJtd td tjtr?ztt tdt d W n t	y> } zt
d|  W Y d }~nd }~ww t  t  td tjtjj}|jttd}|jdd	sft
d
 d S zzatd t  td}td}|std W W |  td d S t|}|  t }td| d 	 tddkrttdd}t  }	|	|krtd|	 d| d ntd| d t!||d|d\}
}}|"| |
rd|
vs|
d d std nd d! |
d d D }td"t#| d#| d |dkr&|r&|d $d$}|r&t%|}t&| t'dt#|t(D ]}|||t(  }|s>q.td%|t( d&  d't#|t( d& t(  d(| d d)d! |D }t)  z.t*+t,}|- }d*.d+t#| }|/d,t0 d-| d.| d/d0 |1 D  |2  W n t3y } zt
d1|  W Y d }~nd }~ww  fd2d!|D }i }|rtd3t#| d4 |4d5t#|  t5||d6d&d5d7\}}}}|r|"| |rd8|v r|6d9d: |d8 D  t7 }|rEtd;t#| d< |4d&t#|  |4d5t#|  t5||d6d&d5d7\}}}}|r1|"| |rEd8|v rE|6d=d: |d8 D  td>t#| d? g }|D ]B}|d@ }||vraqT|| }|6| t8|||}t9|||} | rt:| } t;<t=j>? | dA< dB| dC< |@|  tABd& qT|rtdDt#| dE d }!zzt*+t,}!|!- }"dFd! |D }#|#rdG.d+gt#|# }$d,t0 d-|$ d.}%|"/|%|# dHd0 |"1 D  t# }&t#|#|& }'tdI|' dJ|& dK tCtD}(tEF|(W d    n	1 s	w   Y  dLd! D })|)GdAdCg dG.d+gt#|) }dMd! |)D }*dNt0 dOdG.|* dP| d.}+fdQd!|D },|"H|+|, |!I  tdRt#| dS ddTlJmK}- dUd! |D }|rr|-jLdV|gdW |-LdX tdYt#| dZ W n t*jMy } ztjd[| dd\  d }~ww W |!r|!2  n	|!r|!2  w w tABd] q.t| |d&7 }tABd& qtd^ W n t3y } ztjd_| dd\ W Y d }~nd }~ww W |  td d S W |  td d S |  td w )`Nz6Reset requested. Clearing backfill state and database.r   zRemoved legacy state file .z$Could not remove legacy state file: z>Database has been reset (Deals and User Restrictions cleared).)timeoutF)blockingzD--- Task: backfill_deals is already running. Skipping execution. ---z$--- Task: backfill_deals started ---KEEPA_API_KEY	XAI_TOKENz KEEPA_API_KEY not set. Aborting.z+--- Task: backfill_deals lock released. ---z --- Resuming backfill from page rB   Tbackfill_limit_enabledtruebackfill_limit_counti  z'--- Artificial backfill limit reached (z >= z). Stopping backfill. ---zFetching page z of deals...)use_deal_settingstoken_managerr   drz)No more deals found. Pagination complete.c                 S   s   g | ]}t |d r|qS asin)r   r8   .0dr&   r&   r'   
<listcomp>   s    z"backfill_deals.<locals>.<listcomp>zFound z deals on page 
lastUpdatez--- Processing chunk r	   /z	 on page c                 S   s   g | ]}|d  qS rQ   r&   rS   r&   r&   r'   rV          ,?zSELECT ASIN FROM z WHERE ASIN IN ()c                 S      h | ]}|d  qS r   r&   )rT   r*   r&   r&   r'   	<setcomp>   rY   z!backfill_deals.<locals>.<setcomp>z Failed to check existing ASINs: c                    s   g | ]}| vr|qS r&   r&   )rT   a)existing_asinsr&   r'   rV          zFetching full history for z NEW deals...r   im  )dayshistoryoffersproductsc                 S      i | ]}|d  |qS rQ   r&   rT   pr&   r&   r'   
<dictcomp>       z"backfill_deals.<locals>.<dictcomp>zFetching lightweight stats for z EXISTING deals...c                 S   rg   rQ   r&   rh   r&   r&   r'   rj      rk   zFetched product data for z ASINs in chunk.rR   last_seen_utc
backfillersourcez
Upserting z. processed deals from chunk into the database.c                 S   s    g | ]}| d r| d qS ASINr8   rT   rowr&   r&   r'   rV     s     z, c                 S   r]   r^   r&   rr   r&   r&   r'   r_     rY   z--- Upsert Stats: z New Deals, z Refreshed Deals ---c                 S   s   g | ]}t |qS r&   )r
   rT   hr&   r&   r'   rV   "  rY   c                 S   s   g | ]}d | d qS )"r&   )rT   colr&   r&   r'   rV   &  rb   zINSERT OR REPLACE INTO z (z
) VALUES (c                    s6   g | ] t  fd dD  d df qS )c                 3   s    | ]}  |V  qd S )Nrq   rt   rs   r&   r'   	<genexpr>(  s    z,backfill_deals.<locals>.<listcomp>.<genexpr>rl   rn   )tupler8   )rT   )headers_datarx   r'   rV   (  s   6 zSuccessfully upserted z deals.r   c                 S   s   g | ]
}d |v r|d  qS ro   r&   rS   r&   r&   r'   rV   .  s    z4keepa_deals.sp_api_tasks.check_restriction_for_asins)argsz+keepa_deals.simple_task.update_recent_dealsz#--- Triggered downstream tasks for z ASINs. ---z&Database error while upserting deals: )exc_info<   z&--- Task: backfill_deals finished. ---z5An unexpected error occurred in backfill_deals task: )Nr   r.   r4   r9   r0   r1   r2   r3   removeOSErrorwarningr   r   redisRedisfrom_urlceleryconf
broker_urllockLOCK_KEYLOCK_TIMEOUTacquirer   getenvr/   releaser   sync_tokensrA   r   r,   r   r   update_after_calllenr8   r(   r   rangeDEALS_PER_CHUNKsetsqlite3connectr   cursorjoinexecute
TABLE_NAMEfetchallclose	Exceptionrequest_permission_for_callr   updatelistr   r   r   r   nowr   r!   r"   appendtimesleepr5   HEADERS_PATHr6   r7   extendexecutemanycommitworkerr   	send_taskError).resetr@   redis_clientr   api_keyxai_api_keyrO   r?   limit_countcurrent_countdeal_response_tokens_leftdeals_on_page	newest_tswm_isoichunk_deals	all_asins
conn_checkc_checkplaceholders	new_asinsall_fetched_products	prod_respt_leftexisting_listrows_to_upsertdealrR   product_dataseller_data_cacheprocessed_rowconnr   asin_list_upsertplaceholders_checkquery_checkcount_refreshed	count_newr=   
db_columnsquoted_columnsquerydata_to_insertr   r&   )ra   r{   r'   backfill_dealsS   sP  







 J 


6
#




 
  > r   )F)9loggingr   r0   r6   r   r   r   r   r   dotenvr   r   r   r   r   db_utilsr
   r   r   r   r   r   r   r   r   	keepa_apir   r   r   r   rO   r   
processingr   r   seller_infor   stable_calculationsr   __name__r.   BACKFILLER_VERSIONr   r1   r   dirname__file__r   r   r   r   r3   r(   rA   r9   taskr   r&   r&   r&   r'   <module>   s:    ,
