-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Open
Description
Git provider
Other
System Info
Gitea: 1.25.1
Docker image: codiumai/pr-agent:0.31-gitea_app
Bug details
Error while getting files from PR:
'X-Gitea-Warning': 'token and access_token API authentication is deprecated and will be removed in gitea 1.23. Please use AuthorizationHeaderToken instead. Existing queries will continue to work but without authorization.'
Simple fix for /app/pr_agent/git_providers/gitea_provider.py
--- v0.31/pr_agent/git_providers/gitea_provider.py 2025-11-28 14:23:21
+++ gitea-fix/pr_agent/git_providers/gitea_provider.py 2025-11-28 14:24:30
@@ -551,7 +551,7 @@
if not self.pr:
self.logger.error("Failed to get PR branch")
return ""
-
+
if not self.pr.head:
self.logger.error("PR head not found")
return ""
@@ -747,15 +747,15 @@
def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> str:
"""Get the diff content of a pull request using direct API call"""
try:
- token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+ auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+ header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}.diff'
- if token:
- url = f'{url}?token={token}'
response = self.api_client.call_api(
url,
'GET',
path_params={},
+ header_params=header_params,
response_type=None,
_return_http_data_only=False,
_preload_content=False
@@ -768,7 +768,7 @@
raw_data = response[0].read()
return raw_data.decode('utf-8')
else:
- error_msg = f"Unexpected response format received from API: {type(response)}"
+ error_msg = f"Unexpected response format: {type(response)}"
self.logger.error(error_msg)
raise RuntimeError(error_msg)
@@ -780,41 +780,21 @@
raise e
def get_pull_request(self, owner: str, repo: str, pr_number: int):
- """Get pull request details including description"""
- return self.repository.repo_get_pull_request(
- owner=owner,
- repo=repo,
- index=pr_number
- )
+ return self.repository.repo_get_pull_request(owner=owner, repo=repo, index=pr_number)
- def edit_pull_request(self, owner: str, repo: str, pr_number: int,title : str, body: str):
- """Edit pull request description"""
- body = {
- "body": body,
- "title" : title
- }
- return self.repository.repo_edit_pull_request(
- owner=owner,
- repo=repo,
- index=pr_number,
- body=body
- )
+ def edit_pull_request(self, owner: str, repo: str, pr_number: int, title : str, body: str):
+ body = {"body": body, "title" : title}
+ return self.repository.repo_edit_pull_request(owner=owner, repo=repo, index=pr_number, body=body)
def get_change_file_pull_request(self, owner: str, repo: str, pr_number: int):
- """Get changed files in the pull request"""
try:
- token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+ auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+ header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}/files'
- if token:
- url = f'{url}?token={token}'
response = self.api_client.call_api(
- url,
- 'GET',
- path_params={},
- response_type=None,
- _return_http_data_only=False,
- _preload_content=False
+ url, 'GET', path_params={}, header_params=header_params,
+ response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
@@ -825,174 +805,103 @@
raw_data = response[0].read()
diff_content = raw_data.decode('utf-8')
return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
-
return []
-
- except ApiException as e:
+ except Exception as e:
self.logger.error(f"Error getting changed files: {e}")
return []
- except Exception as e:
- self.logger.error(f"Unexpected error: {e}")
- return []
def get_languages(self, owner: str, repo: str):
- """Get programming languages used in the repository"""
try:
- token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+ auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+ header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/languages'
- if token:
- url = f'{url}?token={token}'
response = self.api_client.call_api(
- url,
- 'GET',
- path_params={},
- response_type=None,
- _return_http_data_only=False,
- _preload_content=False
+ url, 'GET', path_params={}, header_params=header_params,
+ response_type=None, _return_http_data_only=False, _preload_content=False
)
-
if hasattr(response, 'data'):
- raw_data = response.data.read()
- return json.loads(raw_data.decode('utf-8'))
+ return json.loads(response.data.read().decode('utf-8'))
elif isinstance(response, tuple):
- raw_data = response[0].read()
- return json.loads(raw_data.decode('utf-8'))
-
+ return json.loads(response[0].read().decode('utf-8'))
return {}
-
- except ApiException as e:
+ except Exception as e:
self.logger.error(f"Error getting languages: {e}")
return {}
- except Exception as e:
- self.logger.error(f"Unexpected error: {e}")
- return {}
def get_file_content(self, owner: str, repo: str, commit_sha: str, filepath: str) -> str:
"""Get raw file content from a specific commit"""
-
try:
- token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+ auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+ header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/raw/{filepath}'
- if token:
- url = f'{url}?token={token}&ref={commit_sha}'
+ # Исправление: ref передаем через query, токен через header
+ if commit_sha:
+ url = f'{url}?ref={commit_sha}'
+
response = self.api_client.call_api(
- url,
- 'GET',
- path_params={},
- response_type=None,
- _return_http_data_only=False,
- _preload_content=False
+ url, 'GET', path_params={}, header_params=header_params,
+ response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
- raw_data = response.data.read()
- return raw_data.decode('utf-8')
+ return response.data.read().decode('utf-8')
elif isinstance(response, tuple):
- raw_data = response[0].read()
- return raw_data.decode('utf-8')
-
+ return response[0].read().decode('utf-8')
return ""
-
- except ApiException as e:
+ except Exception as e:
self.logger.error(f"Error getting file: {filepath}, content: {e}")
return ""
- except Exception as e:
- self.logger.error(f"Unexpected error: {e}")
- return ""
def get_issue_labels(self, owner: str, repo: str, issue_number: int):
- """Get labels assigned to the issue"""
- return self.issue.issue_get_labels(
- owner=owner,
- repo=repo,
- index=issue_number
- )
+ return self.issue.issue_get_labels(owner=owner, repo=repo, index=issue_number)
def list_all_commits(self, owner: str, repo: str):
- return self.repository.repo_get_all_commits(
- owner=owner,
- repo=repo
- )
+ return self.repository.repo_get_all_commits(owner=owner, repo=repo)
def add_reviewer(self, owner: str, repo: str, pr_number: int, reviewers: List[str]):
- body = {
- "reviewers": reviewers
- }
+ body = {"reviewers": reviewers}
return self.api_client.call_api(
'/repos/{owner}/{repo}/pulls/{pr_number}/requested_reviewers',
- 'POST',
- path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
- body=body,
- response_type='Repository',
- auth_settings=['AuthorizationHeaderToken']
+ 'POST', path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
+ body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def add_reaction_comment(self, owner: str, repo: str, comment_id: int, reaction: str):
- body = {
- "content": reaction
- }
+ body = {"content": reaction}
return self.api_client.call_api(
'/repos/{owner}/{repo}/issues/comments/{id}/reactions',
- 'POST',
- path_params={'owner': owner, 'repo': repo, 'id': comment_id},
- body=body,
- response_type='Repository',
- auth_settings=['AuthorizationHeaderToken']
+ 'POST', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
+ body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def remove_reaction_comment(self, owner: str, repo: str, comment_id: int):
return self.api_client.call_api(
'/repos/{owner}/{repo}/issues/comments/{id}/reactions',
- 'DELETE',
- path_params={'owner': owner, 'repo': repo, 'id': comment_id},
- response_type='Repository',
- auth_settings=['AuthorizationHeaderToken']
+ 'DELETE', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
+ response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def add_labels(self, owner: str, repo: str, issue_number: int, labels: List[int]):
- body = {
- "labels": labels
- }
- return self.issue.issue_add_label(
- owner=owner,
- repo=repo,
- index=issue_number,
- body=body
- )
+ body = {"labels": labels}
+ return self.issue.issue_add_label(owner=owner, repo=repo, index=issue_number, body=body)
def get_pr_commits(self, owner: str, repo: str, pr_number: int):
- """Get all commits in a pull request"""
try:
- token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+ auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+ header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}/commits'
- if token:
- url = f'{url}?token={token}'
response = self.api_client.call_api(
- url,
- 'GET',
- path_params={},
- response_type=None,
- _return_http_data_only=False,
- _preload_content=False
+ url, 'GET', path_params={}, header_params=header_params,
+ response_type=None, _return_http_data_only=False, _preload_content=False
)
-
if hasattr(response, 'data'):
- raw_data = response.data.read()
- commits_data = json.loads(raw_data.decode('utf-8'))
- return commits_data
+ return json.loads(response.data.read().decode('utf-8'))
elif isinstance(response, tuple):
- raw_data = response[0].read()
- commits_data = json.loads(raw_data.decode('utf-8'))
- return commits_data
-
+ return json.loads(response[0].read().decode('utf-8'))
return []
-
- except ApiException as e:
+ except Exception as e:
self.logger.error(f"Error getting PR commits: {e}")
return []
- except Exception as e:
- self.logger.error(f"Unexpected error: {e}")
- return []
Same fix but in runtime using Python:
import os
import sys
file_path = "/app/pr_agent/git_providers/gitea_provider.py"
print(f"Запуск патчинга файла: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
print(f"Файл {file_path} не найден!")
sys.exit(1)
# Ищем маркер начала класса
split_marker = "class RepoApi(giteapy.RepositoryApi):"
if split_marker not in content:
# Если маркера нет, проверим, может файл уже пропатчен или структура другая
if "AuthorizationHeaderToken" in content and "token =" not in content:
print("Похоже, файл уже пропатчен.")
sys.exit(0)
print("Ошибка: Не могу найти класс RepoApi для замены.")
sys.exit(1)
# Отрезаем старую реализацию
base_content = content.split(split_marker)[0]
# Новая реализация класса (безопасная для Gitea 1.22+)
new_repo_api_code = """class RepoApi(giteapy.RepositoryApi):
def __init__(self, client: giteapy.ApiClient):
self.repository = giteapy.RepositoryApi(client)
self.issue = giteapy.IssueApi(client)
self.logger = get_logger()
super().__init__(client)
def create_inline_comment(self, owner: str, repo: str, pr_number: int, body : str ,commit_id : str, comments: List[Dict[str, Any]]) -> None:
body = {
"body": body,
"comments": comments,
"commit_id": commit_id,
}
return self.api_client.call_api(
'/repos/{owner}/{repo}/pulls/{pr_number}/reviews',
'POST',
path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
body=body,
response_type='Repository',
auth_settings=['AuthorizationHeaderToken']
)
def create_comment(self, owner: str, repo: str, index: int, comment: str):
body = {
"body": comment
}
return self.issue.issue_create_comment(
owner=owner,
repo=repo,
index=index,
body=body
)
def edit_comment(self, owner: str, repo: str, comment_id: int, comment: str):
body = {
"body": comment
}
return self.issue.issue_edit_comment(
owner=owner,
repo=repo,
id=comment_id,
body=body
)
def remove_comment(self, owner: str, repo: str, comment_id: int):
return self.issue.issue_delete_comment(
owner=owner,
repo=repo,
id=comment_id
)
def list_all_comments(self, owner: str, repo: str, index: int):
return self.issue.issue_get_comments(
owner=owner,
repo=repo,
index=index
)
def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> str:
\"\"\"Get the diff content of a pull request using direct API call\"\"\"
try:
auth_token = self.api_client.configuration.api_key.get('Authorization', '')
header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}.diff'
response = self.api_client.call_api(
url,
'GET',
path_params={},
header_params=header_params,
response_type=None,
_return_http_data_only=False,
_preload_content=False
)
if hasattr(response, 'data'):
raw_data = response.data.read()
return raw_data.decode('utf-8')
elif isinstance(response, tuple):
raw_data = response[0].read()
return raw_data.decode('utf-8')
else:
error_msg = f"Unexpected response format: {type(response)}"
self.logger.error(error_msg)
raise RuntimeError(error_msg)
except ApiException as e:
self.logger.error(f"Error getting diff: {str(e)}")
raise e
except Exception as e:
self.logger.error(f"Unexpected error: {str(e)}")
raise e
def get_pull_request(self, owner: str, repo: str, pr_number: int):
return self.repository.repo_get_pull_request(owner=owner, repo=repo, index=pr_number)
def edit_pull_request(self, owner: str, repo: str, pr_number: int, title : str, body: str):
body = {"body": body, "title" : title}
return self.repository.repo_edit_pull_request(owner=owner, repo=repo, index=pr_number, body=body)
def get_change_file_pull_request(self, owner: str, repo: str, pr_number: int):
try:
auth_token = self.api_client.configuration.api_key.get('Authorization', '')
header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}/files'
response = self.api_client.call_api(
url, 'GET', path_params={}, header_params=header_params,
response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
raw_data = response.data.read()
diff_content = raw_data.decode('utf-8')
return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
elif isinstance(response, tuple):
raw_data = response[0].read()
diff_content = raw_data.decode('utf-8')
return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
return []
except Exception as e:
self.logger.error(f"Error getting changed files: {e}")
return []
def get_languages(self, owner: str, repo: str):
try:
auth_token = self.api_client.configuration.api_key.get('Authorization', '')
header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/languages'
response = self.api_client.call_api(
url, 'GET', path_params={}, header_params=header_params,
response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
return json.loads(response.data.read().decode('utf-8'))
elif isinstance(response, tuple):
return json.loads(response[0].read().decode('utf-8'))
return {}
except Exception as e:
self.logger.error(f"Error getting languages: {e}")
return {}
def get_file_content(self, owner: str, repo: str, commit_sha: str, filepath: str) -> str:
\"\"\"Get raw file content from a specific commit\"\"\"
try:
auth_token = self.api_client.configuration.api_key.get('Authorization', '')
header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/raw/{filepath}'
# Исправление: ref передаем через query, токен через header
if commit_sha:
url = f'{url}?ref={commit_sha}'
response = self.api_client.call_api(
url, 'GET', path_params={}, header_params=header_params,
response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
return response.data.read().decode('utf-8')
elif isinstance(response, tuple):
return response[0].read().decode('utf-8')
return ""
except Exception as e:
self.logger.error(f"Error getting file: {filepath}, content: {e}")
return ""
def get_issue_labels(self, owner: str, repo: str, issue_number: int):
return self.issue.issue_get_labels(owner=owner, repo=repo, index=issue_number)
def list_all_commits(self, owner: str, repo: str):
return self.repository.repo_get_all_commits(owner=owner, repo=repo)
def add_reviewer(self, owner: str, repo: str, pr_number: int, reviewers: List[str]):
body = {"reviewers": reviewers}
return self.api_client.call_api(
'/repos/{owner}/{repo}/pulls/{pr_number}/requested_reviewers',
'POST', path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def add_reaction_comment(self, owner: str, repo: str, comment_id: int, reaction: str):
body = {"content": reaction}
return self.api_client.call_api(
'/repos/{owner}/{repo}/issues/comments/{id}/reactions',
'POST', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def remove_reaction_comment(self, owner: str, repo: str, comment_id: int):
return self.api_client.call_api(
'/repos/{owner}/{repo}/issues/comments/{id}/reactions',
'DELETE', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
response_type='Repository', auth_settings=['AuthorizationHeaderToken']
)
def add_labels(self, owner: str, repo: str, issue_number: int, labels: List[int]):
body = {"labels": labels}
return self.issue.issue_add_label(owner=owner, repo=repo, index=issue_number, body=body)
def get_pr_commits(self, owner: str, repo: str, pr_number: int):
try:
auth_token = self.api_client.configuration.api_key.get('Authorization', '')
header_params = {'Authorization': auth_token} if auth_token else {}
url = f'/repos/{owner}/{repo}/pulls/{pr_number}/commits'
response = self.api_client.call_api(
url, 'GET', path_params={}, header_params=header_params,
response_type=None, _return_http_data_only=False, _preload_content=False
)
if hasattr(response, 'data'):
return json.loads(response.data.read().decode('utf-8'))
elif isinstance(response, tuple):
return json.loads(response[0].read().decode('utf-8'))
return []
except Exception as e:
self.logger.error(f"Error getting PR commits: {e}")
return []
"""
final_content = base_content + new_repo_api_code
with open(file_path, 'w', encoding='utf-8') as f:
f.write(final_content)
print("Патч успешно применен!")
Sry, do not have time to make PR(
Metadata
Metadata
Assignees
Labels
No labels