Skip to content

gitea_provider.py outdated, not working with latest Gitea #2122

@delfer

Description

@delfer

Git provider

Other

System Info

Gitea: 1.25.1
Docker image: codiumai/pr-agent:0.31-gitea_app

Bug details

Error while getting files from PR:

'X-Gitea-Warning': 'token and access_token API authentication is deprecated and will be removed in gitea 1.23. Please use AuthorizationHeaderToken instead. Existing queries will continue to work but without authorization.'

Simple fix for /app/pr_agent/git_providers/gitea_provider.py

--- v0.31/pr_agent/git_providers/gitea_provider.py	2025-11-28 14:23:21
+++ gitea-fix/pr_agent/git_providers/gitea_provider.py	2025-11-28 14:24:30
@@ -551,7 +551,7 @@
         if not self.pr:
             self.logger.error("Failed to get PR branch")
             return ""
-
+
         if not self.pr.head:
             self.logger.error("PR head not found")
             return ""
@@ -747,15 +747,15 @@
     def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> str:
         """Get the diff content of a pull request using direct API call"""
         try:
-            token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+            header_params = {'Authorization': auth_token} if auth_token else {}
             url = f'/repos/{owner}/{repo}/pulls/{pr_number}.diff'
-            if token:
-                url = f'{url}?token={token}'

             response = self.api_client.call_api(
                 url,
                 'GET',
                 path_params={},
+                header_params=header_params,
                 response_type=None,
                 _return_http_data_only=False,
                 _preload_content=False
@@ -768,7 +768,7 @@
                 raw_data = response[0].read()
                 return raw_data.decode('utf-8')
             else:
-                error_msg = f"Unexpected response format received from API: {type(response)}"
+                error_msg = f"Unexpected response format: {type(response)}"
                 self.logger.error(error_msg)
                 raise RuntimeError(error_msg)

@@ -780,41 +780,21 @@
             raise e

     def get_pull_request(self, owner: str, repo: str, pr_number: int):
-        """Get pull request details including description"""
-        return self.repository.repo_get_pull_request(
-            owner=owner,
-            repo=repo,
-            index=pr_number
-        )
+        return self.repository.repo_get_pull_request(owner=owner, repo=repo, index=pr_number)

-    def edit_pull_request(self, owner: str, repo: str, pr_number: int,title : str, body: str):
-        """Edit pull request description"""
-        body = {
-            "body": body,
-            "title" : title
-        }
-        return self.repository.repo_edit_pull_request(
-            owner=owner,
-            repo=repo,
-            index=pr_number,
-            body=body
-        )
+    def edit_pull_request(self, owner: str, repo: str, pr_number: int, title : str, body: str):
+        body = {"body": body, "title" : title}
+        return self.repository.repo_edit_pull_request(owner=owner, repo=repo, index=pr_number, body=body)

     def get_change_file_pull_request(self, owner: str, repo: str, pr_number: int):
-        """Get changed files in the pull request"""
         try:
-            token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+            header_params = {'Authorization': auth_token} if auth_token else {}
             url = f'/repos/{owner}/{repo}/pulls/{pr_number}/files'
-            if token:
-                url = f'{url}?token={token}'

             response = self.api_client.call_api(
-                url,
-                'GET',
-                path_params={},
-                response_type=None,
-                _return_http_data_only=False,
-                _preload_content=False
+                url, 'GET', path_params={}, header_params=header_params,
+                response_type=None, _return_http_data_only=False, _preload_content=False
             )

             if hasattr(response, 'data'):
@@ -825,174 +805,103 @@
                 raw_data = response[0].read()
                 diff_content = raw_data.decode('utf-8')
                 return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
-
             return []
-
-        except ApiException as e:
+        except Exception as e:
             self.logger.error(f"Error getting changed files: {e}")
             return []
-        except Exception as e:
-            self.logger.error(f"Unexpected error: {e}")
-            return []

     def get_languages(self, owner: str, repo: str):
-        """Get programming languages used in the repository"""
         try:
-            token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+            header_params = {'Authorization': auth_token} if auth_token else {}
             url = f'/repos/{owner}/{repo}/languages'
-            if token:
-                url = f'{url}?token={token}'

             response = self.api_client.call_api(
-                url,
-                'GET',
-                path_params={},
-                response_type=None,
-                _return_http_data_only=False,
-                _preload_content=False
+                url, 'GET', path_params={}, header_params=header_params,
+                response_type=None, _return_http_data_only=False, _preload_content=False
             )
-
             if hasattr(response, 'data'):
-                raw_data = response.data.read()
-                return json.loads(raw_data.decode('utf-8'))
+                return json.loads(response.data.read().decode('utf-8'))
             elif isinstance(response, tuple):
-                raw_data = response[0].read()
-                return json.loads(raw_data.decode('utf-8'))
-
+                return json.loads(response[0].read().decode('utf-8'))
             return {}
-
-        except ApiException as e:
+        except Exception as e:
             self.logger.error(f"Error getting languages: {e}")
             return {}
-        except Exception as e:
-            self.logger.error(f"Unexpected error: {e}")
-            return {}

     def get_file_content(self, owner: str, repo: str, commit_sha: str, filepath: str) -> str:
         """Get raw file content from a specific commit"""
-
         try:
-            token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+            header_params = {'Authorization': auth_token} if auth_token else {}
             url = f'/repos/{owner}/{repo}/raw/{filepath}'
-            if token:
-                url = f'{url}?token={token}&ref={commit_sha}'

+            # Исправление: ref передаем через query, токен через header
+            if commit_sha:
+                url = f'{url}?ref={commit_sha}'
+
             response = self.api_client.call_api(
-                url,
-                'GET',
-                path_params={},
-                response_type=None,
-                _return_http_data_only=False,
-                _preload_content=False
+                url, 'GET', path_params={}, header_params=header_params,
+                response_type=None, _return_http_data_only=False, _preload_content=False
             )

             if hasattr(response, 'data'):
-                raw_data = response.data.read()
-                return raw_data.decode('utf-8')
+                return response.data.read().decode('utf-8')
             elif isinstance(response, tuple):
-                raw_data = response[0].read()
-                return raw_data.decode('utf-8')
-
+                return response[0].read().decode('utf-8')
             return ""
-
-        except ApiException as e:
+        except Exception as e:
             self.logger.error(f"Error getting file: {filepath}, content: {e}")
             return ""
-        except Exception as e:
-            self.logger.error(f"Unexpected error: {e}")
-            return ""

     def get_issue_labels(self, owner: str, repo: str, issue_number: int):
-        """Get labels assigned to the issue"""
-        return self.issue.issue_get_labels(
-            owner=owner,
-            repo=repo,
-            index=issue_number
-        )
+        return self.issue.issue_get_labels(owner=owner, repo=repo, index=issue_number)

     def list_all_commits(self, owner: str, repo: str):
-        return self.repository.repo_get_all_commits(
-            owner=owner,
-            repo=repo
-        )
+        return self.repository.repo_get_all_commits(owner=owner, repo=repo)

     def add_reviewer(self, owner: str, repo: str, pr_number: int, reviewers: List[str]):
-        body = {
-            "reviewers": reviewers
-        }
+        body = {"reviewers": reviewers}
         return self.api_client.call_api(
             '/repos/{owner}/{repo}/pulls/{pr_number}/requested_reviewers',
-            'POST',
-            path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
-            body=body,
-            response_type='Repository',
-            auth_settings=['AuthorizationHeaderToken']
+            'POST', path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
+            body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
         )

     def add_reaction_comment(self, owner: str, repo: str, comment_id: int, reaction: str):
-        body = {
-            "content": reaction
-        }
+        body = {"content": reaction}
         return self.api_client.call_api(
             '/repos/{owner}/{repo}/issues/comments/{id}/reactions',
-            'POST',
-            path_params={'owner': owner, 'repo': repo, 'id': comment_id},
-            body=body,
-            response_type='Repository',
-            auth_settings=['AuthorizationHeaderToken']
+            'POST', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
+            body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
         )

     def remove_reaction_comment(self, owner: str, repo: str, comment_id: int):
         return self.api_client.call_api(
             '/repos/{owner}/{repo}/issues/comments/{id}/reactions',
-            'DELETE',
-            path_params={'owner': owner, 'repo': repo, 'id': comment_id},
-            response_type='Repository',
-            auth_settings=['AuthorizationHeaderToken']
+            'DELETE', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
+            response_type='Repository', auth_settings=['AuthorizationHeaderToken']
         )

     def add_labels(self, owner: str, repo: str, issue_number: int, labels: List[int]):
-        body = {
-            "labels": labels
-        }
-        return self.issue.issue_add_label(
-            owner=owner,
-            repo=repo,
-            index=issue_number,
-            body=body
-        )
+        body = {"labels": labels}
+        return self.issue.issue_add_label(owner=owner, repo=repo, index=issue_number, body=body)

     def get_pr_commits(self, owner: str, repo: str, pr_number: int):
-        """Get all commits in a pull request"""
         try:
-            token = self.api_client.configuration.api_key.get('Authorization', '').replace('token ', '')
+            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
+            header_params = {'Authorization': auth_token} if auth_token else {}
             url = f'/repos/{owner}/{repo}/pulls/{pr_number}/commits'
-            if token:
-                url = f'{url}?token={token}'

             response = self.api_client.call_api(
-                url,
-                'GET',
-                path_params={},
-                response_type=None,
-                _return_http_data_only=False,
-                _preload_content=False
+                url, 'GET', path_params={}, header_params=header_params,
+                response_type=None, _return_http_data_only=False, _preload_content=False
             )
-
             if hasattr(response, 'data'):
-                raw_data = response.data.read()
-                commits_data = json.loads(raw_data.decode('utf-8'))
-                return commits_data
+                return json.loads(response.data.read().decode('utf-8'))
             elif isinstance(response, tuple):
-                raw_data = response[0].read()
-                commits_data = json.loads(raw_data.decode('utf-8'))
-                return commits_data
-
+                return json.loads(response[0].read().decode('utf-8'))
             return []
-
-        except ApiException as e:
+        except Exception as e:
             self.logger.error(f"Error getting PR commits: {e}")
             return []
-        except Exception as e:
-            self.logger.error(f"Unexpected error: {e}")
-            return []

Same fix but in runtime using Python:

import os
import sys

file_path = "/app/pr_agent/git_providers/gitea_provider.py"

print(f"Запуск патчинга файла: {file_path}")

try:
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
except FileNotFoundError:
    print(f"Файл {file_path} не найден!")
    sys.exit(1)

# Ищем маркер начала класса
split_marker = "class RepoApi(giteapy.RepositoryApi):"
if split_marker not in content:
    # Если маркера нет, проверим, может файл уже пропатчен или структура другая
    if "AuthorizationHeaderToken" in content and "token =" not in content:
            print("Похоже, файл уже пропатчен.")
            sys.exit(0)
    print("Ошибка: Не могу найти класс RepoApi для замены.")
    sys.exit(1)

# Отрезаем старую реализацию
base_content = content.split(split_marker)[0]

# Новая реализация класса (безопасная для Gitea 1.22+)
new_repo_api_code = """class RepoApi(giteapy.RepositoryApi):
    def __init__(self, client: giteapy.ApiClient):
        self.repository = giteapy.RepositoryApi(client)
        self.issue = giteapy.IssueApi(client)
        self.logger = get_logger()
        super().__init__(client)

    def create_inline_comment(self, owner: str, repo: str, pr_number: int, body : str ,commit_id : str, comments: List[Dict[str, Any]]) -> None:
        body = {
            "body": body,
            "comments": comments,
            "commit_id": commit_id,
        }
        return self.api_client.call_api(
            '/repos/{owner}/{repo}/pulls/{pr_number}/reviews',
            'POST',
            path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
            body=body,
            response_type='Repository',
            auth_settings=['AuthorizationHeaderToken']
        )

    def create_comment(self, owner: str, repo: str, index: int, comment: str):
        body = {
            "body": comment
        }
        return self.issue.issue_create_comment(
            owner=owner,
            repo=repo,
            index=index,
            body=body
        )

    def edit_comment(self, owner: str, repo: str, comment_id: int, comment: str):
        body = {
            "body": comment
        }
        return self.issue.issue_edit_comment(
            owner=owner,
            repo=repo,
            id=comment_id,
            body=body
        )

    def remove_comment(self, owner: str, repo: str, comment_id: int):
        return self.issue.issue_delete_comment(
            owner=owner,
            repo=repo,
            id=comment_id
        )

    def list_all_comments(self, owner: str, repo: str, index: int):
        return self.issue.issue_get_comments(
            owner=owner,
            repo=repo,
            index=index
        )

    def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> str:
        \"\"\"Get the diff content of a pull request using direct API call\"\"\"
        try:
            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
            header_params = {'Authorization': auth_token} if auth_token else {}
            url = f'/repos/{owner}/{repo}/pulls/{pr_number}.diff'
            
            response = self.api_client.call_api(
                url,
                'GET',
                path_params={},
                header_params=header_params,
                response_type=None,
                _return_http_data_only=False,
                _preload_content=False
            )

            if hasattr(response, 'data'):
                raw_data = response.data.read()
                return raw_data.decode('utf-8')
            elif isinstance(response, tuple):
                raw_data = response[0].read()
                return raw_data.decode('utf-8')
            else:
                error_msg = f"Unexpected response format: {type(response)}"
                self.logger.error(error_msg)
                raise RuntimeError(error_msg)

        except ApiException as e:
            self.logger.error(f"Error getting diff: {str(e)}")
            raise e
        except Exception as e:
            self.logger.error(f"Unexpected error: {str(e)}")
            raise e

    def get_pull_request(self, owner: str, repo: str, pr_number: int):
        return self.repository.repo_get_pull_request(owner=owner, repo=repo, index=pr_number)

    def edit_pull_request(self, owner: str, repo: str, pr_number: int, title : str, body: str):
        body = {"body": body, "title" : title}
        return self.repository.repo_edit_pull_request(owner=owner, repo=repo, index=pr_number, body=body)

    def get_change_file_pull_request(self, owner: str, repo: str, pr_number: int):
        try:
            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
            header_params = {'Authorization': auth_token} if auth_token else {}
            url = f'/repos/{owner}/{repo}/pulls/{pr_number}/files'

            response = self.api_client.call_api(
                url, 'GET', path_params={}, header_params=header_params,
                response_type=None, _return_http_data_only=False, _preload_content=False
            )

            if hasattr(response, 'data'):
                raw_data = response.data.read()
                diff_content = raw_data.decode('utf-8')
                return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
            elif isinstance(response, tuple):
                raw_data = response[0].read()
                diff_content = raw_data.decode('utf-8')
                return json.loads(diff_content) if isinstance(diff_content, str) else diff_content
            return []
        except Exception as e:
            self.logger.error(f"Error getting changed files: {e}")
            return []

    def get_languages(self, owner: str, repo: str):
        try:
            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
            header_params = {'Authorization': auth_token} if auth_token else {}
            url = f'/repos/{owner}/{repo}/languages'
            
            response = self.api_client.call_api(
                url, 'GET', path_params={}, header_params=header_params,
                response_type=None, _return_http_data_only=False, _preload_content=False
            )
            if hasattr(response, 'data'):
                return json.loads(response.data.read().decode('utf-8'))
            elif isinstance(response, tuple):
                return json.loads(response[0].read().decode('utf-8'))
            return {}
        except Exception as e:
            self.logger.error(f"Error getting languages: {e}")
            return {}

    def get_file_content(self, owner: str, repo: str, commit_sha: str, filepath: str) -> str:
        \"\"\"Get raw file content from a specific commit\"\"\"
        try:
            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
            header_params = {'Authorization': auth_token} if auth_token else {}
            url = f'/repos/{owner}/{repo}/raw/{filepath}'
            
            # Исправление: ref передаем через query, токен через header
            if commit_sha:
                url = f'{url}?ref={commit_sha}'

            response = self.api_client.call_api(
                url, 'GET', path_params={}, header_params=header_params,
                response_type=None, _return_http_data_only=False, _preload_content=False
            )

            if hasattr(response, 'data'):
                return response.data.read().decode('utf-8')
            elif isinstance(response, tuple):
                return response[0].read().decode('utf-8')
            return ""
        except Exception as e:
            self.logger.error(f"Error getting file: {filepath}, content: {e}")
            return ""

    def get_issue_labels(self, owner: str, repo: str, issue_number: int):
        return self.issue.issue_get_labels(owner=owner, repo=repo, index=issue_number)

    def list_all_commits(self, owner: str, repo: str):
        return self.repository.repo_get_all_commits(owner=owner, repo=repo)

    def add_reviewer(self, owner: str, repo: str, pr_number: int, reviewers: List[str]):
        body = {"reviewers": reviewers}
        return self.api_client.call_api(
            '/repos/{owner}/{repo}/pulls/{pr_number}/requested_reviewers',
            'POST', path_params={'owner': owner, 'repo': repo, 'pr_number': pr_number},
            body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
        )

    def add_reaction_comment(self, owner: str, repo: str, comment_id: int, reaction: str):
        body = {"content": reaction}
        return self.api_client.call_api(
            '/repos/{owner}/{repo}/issues/comments/{id}/reactions',
            'POST', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
            body=body, response_type='Repository', auth_settings=['AuthorizationHeaderToken']
        )

    def remove_reaction_comment(self, owner: str, repo: str, comment_id: int):
        return self.api_client.call_api(
            '/repos/{owner}/{repo}/issues/comments/{id}/reactions',
            'DELETE', path_params={'owner': owner, 'repo': repo, 'id': comment_id},
            response_type='Repository', auth_settings=['AuthorizationHeaderToken']
        )

    def add_labels(self, owner: str, repo: str, issue_number: int, labels: List[int]):
        body = {"labels": labels}
        return self.issue.issue_add_label(owner=owner, repo=repo, index=issue_number, body=body)

    def get_pr_commits(self, owner: str, repo: str, pr_number: int):
        try:
            auth_token = self.api_client.configuration.api_key.get('Authorization', '')
            header_params = {'Authorization': auth_token} if auth_token else {}
            url = f'/repos/{owner}/{repo}/pulls/{pr_number}/commits'
            
            response = self.api_client.call_api(
                url, 'GET', path_params={}, header_params=header_params,
                response_type=None, _return_http_data_only=False, _preload_content=False
            )
            if hasattr(response, 'data'):
                return json.loads(response.data.read().decode('utf-8'))
            elif isinstance(response, tuple):
                return json.loads(response[0].read().decode('utf-8'))
            return []
        except Exception as e:
            self.logger.error(f"Error getting PR commits: {e}")
            return []
"""

final_content = base_content + new_repo_api_code

with open(file_path, 'w', encoding='utf-8') as f:
    f.write(final_content)

print("Патч успешно применен!")

Sry, do not have time to make PR(

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions